一张图概览线程池的处理逻辑

ThreadPoolExecutor的核心参数

  • 先来看看全参构造器
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
  TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
  RejectedExecutionHandler handler) 

corePoolSize(核心线程大小)

  • poolSize小于corePoolSize时,线程池会创建一个新的线程来处理任务,即使其它空闲的线程能处理新任务也会创建线程
  • 调用线程池的prestartAllCoreThreads()方法会提前创建,并且启动所有基本线程。

maximumPoolSize(线程池的最大数量)

  • 如果任务队列满了,并且已创建的线程数,小于最大数量,线程池就会创建一个新的线程来处理。
  • 如果workQueue无界队列,那么这个参数就没效果

keepAliveTime(线程存活时间)

  • 线程的工作线程空闲后保持存活的世界,默认情况下当线程池的线程数小于corePoolSize时生效。当allowCoreThreadTimeOut设置为true时核心线程超时也会被回收
  • 如果任务很多,每个任务执行时间比较短,可以适当调大时间,提高线程利用率

workQueue(任务队列)

  • 用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列。
    • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序。
    • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
    • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
    • priorityBlockingQueue:一个具有优先级的无限阻塞队列。

threadFactory(线程工厂)

  • 用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。

  • 使用开源框架guava提供的ThreadFactoryBuilder可以快速给线程池里的线程设置有意义的名字,代码如下。

    new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();

handler(饱和策略)

当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。在JDK 1.5中Java线程池框架提供了以下4种策略。

  • AbortPolicy:直接抛出异常。
  • CallerRunsPolicy:调用任务的run()方法绕过线程池直接执行。
  • DiscardOldestPolicy:丢弃队列里最近节点的一个任务,并执行当前任务。
  • DiscardPolicy:不处理,丢弃掉。

小结

  1. 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
  2. 如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue
  3. 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。
  4. 如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。根据不同的拒绝策略去处理。

ThreadPoolExecutor采取上述步骤的总体设计思路,是为了在执行execute()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热之后(当前运行的线程数大于等于corePoolSize),几乎所有的execute()方法调用都是执行步骤2,而步骤2不需要获取全局锁。

核心源码剖析

处理线程池状态和工作队列的参数 - ctl

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
   // 29
   private static final int COUNT_BITS = Integer.SIZE - 3; 
   // 000-1(29个1)
   private static final int CAPACITY  = (1 << COUNT_BITS) - 1;

   //用左边3位,实现5种线程状态
   //111-0,,,  此状态表示线程池能接收新的任务,十进制为: -536,870,912
   private static final int RUNNING    = -1 << COUNT_BITS;

   //000-0...  此此状态不接受新的任务,但是可以继续执行队列中的任务
   private static final int SHUTDOWN   =  0 << COUNT_BITS;

   //001-0...  此状态全面拒绝,并中断正在处理的任务 十进制为: 536,870,912
   private static final int STOP       =  1 << COUNT_BITS;

   //010-0...  此状态表示所有任务已经被终止
   private static final int TIDYING    =  2 << COUNT_BITS;

   //011-0...  此状态表示已经清理完现场
   private static final int TERMINATED =  3 << COUNT_BITS;

   // ~掩码取反
   private static int runStateOf(int c)     { return c & ~CAPACITY; }
   // 获取工作线程数目
   private static int workerCountOf(int c)  { return c & CAPACITY; }
   // 合并rs(工作状态)和wc(工作线程数) 或运算 到一个32位数中
   private static int ctlOf(int rs, int wc) { return rs | wc; }

线程池状态及状态切换

  • RUNNING:能接受新任务,并处理阻塞队列中的任务
  • SHUTDOWN:不接受新任务,但是可以处理阻塞队列中的任务
  • STOP:不接受新任务,并且不处理阻塞队列中的任务,并且还打断正在运行任务的线程,就是直接撂担子不干了!
  • TIDYING:所有任务都终止,并且工作线程也为0,处于关闭之前的状态
  • TERMINATED:已关闭。

execute方法

public void execute(Runnable command) {
    //如果提交了空的任务 抛出异常
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();//获取当前线程池的状态
    //检查当前工作线程数量是否小于核心线程数量
    if (workerCountOf(c) < corePoolSize) {
        // 通过addWorker方法提交任务
        if (addWorker(command, true))
            return;
        c = ctl.get();//如果提交失败 需要二次检查状态
    }
    //向工作线程提交任务,如果线程池为running,则将线程加入等待队列
    if (isRunning(c) && workQueue.offer(command)) {
        // 再次检查状态
        int recheck = ctl.get();
        //如果重新检查发现线程池不为ruuning,就从队列中移除线程,并执行拒绝策略
        if (!isRunning(recheck) && remove(command))
            reject(command);
        //如果之前的线程已经消费完,就新建一个线程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //如果核心池和等待队列都满了,则尝试创建一个新线程
    else if (!addWorker(command, false))
        //如果addWork()返回false,则执行拒绝策略
        reject(command);
}

Worker类

  • Worker
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * 这里传入任务,并将Worker自己放入到thread中
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //
            this.thread = getThreadFactory().newThread(this);
        }

        /** 后续工作线程执行的时候触发这里  */
        public void run() {
            runWorker(this);
        }
    }

addWorker方法

  • 这个方法是任务提交的一个核心方法。在里面完成了状态检查、新建任务、执行任务等一系列动作。
/**
 * 检查是否能添加一个新的任务线程,如果可以则创建并启动任务
 * 返回false的可能性如下:
 * 1.线程池不为RUNNING状态
 * 2.线程工厂创建新的任务线程失败
 * firstTask: 外部启动线程池时需要构造的第一个线程
 */
private boolean addWorker(Runnable firstTask, boolean core) {
    //循环标记
       retry:
   //死循环更新状态
       for (;;) {
           int c = ctl.get();
           int rs = runStateOf(c);//获取运行状态

       //检查线程池是否处于关闭状态
           if (rs >= SHUTDOWN &&
               ! (rs == SHUTDOWN &&
                  firstTask == null &&
                  ! workQueue.isEmpty()))
               return false;

           for (;;) {
         // 获取当前工作线程数量
               int wc = workerCountOf(c);
       //如果已经超过corePoolSize获取maximumPoolSize 返回false
               if (wc >= CAPACITY ||
                   wc >= (core ? corePoolSize : maximumPoolSize))
                   return false;
       //CAS增加一个工作线程
               if (compareAndIncrementWorkerCount(c))
                break retry;
       //再次获取状态
               c = ctl.get();  // Re-read ctl
       //如果状态更新失败 则循环更新
               if (runStateOf(c) != rs)
                   continue retry;
               // else CAS failed due to workerCount change; retry inner loop
           }
       }

       boolean workerStarted = false;
       boolean workerAdded = false;
       Worker w = null;
       try {
           w = new Worker(firstTask);//初始化一个工作线程
           final Thread t = w.thread;
           if (t != null) {
               // 获得全局锁
               final ReentrantLock mainLock = this.mainLock;
               mainLock.lock();
               try {
                   // Recheck while holding lock.
                   // Back out on ThreadFactory failure or if
                   // shut down before lock acquired.
                   int rs = runStateOf(ctl.get());

                   if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                         // 添加工作到hashset中保存
                        workers.add(w);
                        int s = workers.size();
                         if (s > largestPoolSize)
                        largestPoolSize = s;
                        workerAdded = true;
                   }
               } finally {
                   mainLock.unlock();
               }
               if (workerAdded) {
                   // 工作线程启动 执行第一个任务 就是新提交的任务
                   t.start();
                   workerStarted = true;
               }
           }
       } finally {
           if (! workerStarted)
               addWorkerFailed(w);
       }
       return workerStarted;
   }

runWorker方法

  • 在看addWorker时发现,当工作线程创建成功后,会调用t.start()我们知道它实际执行的就是Worker对象的run()方法,而workerrun()方法是这样定义的:

    public void run() {
        runWorker(this);
    }
  • 它实际上是将自己委托给线程池的runWorker方法,不断执行我们提交的任务的run方法。而这个任务可能是我们新提交的,也有可能是从等待队列中获取的。这样就实现了线程池的完成逻辑。

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
           try {
           //不断地从blockingQueue获取任务
               while (task != null || (task = getTask()) != null) {
                   w.lock();
                   // If pool is stopping, ensure thread is interrupted;
                   // if not, ensure thread is not interrupted.  This
                   // requires a recheck in second case to deal with
                   // shutdownNow race while clearing interrupt
                   if ((runStateAtLeast(ctl.get(), STOP) ||
                        (Thread.interrupted() &&
                         runStateAtLeast(ctl.get(), STOP))) &&
                       !wt.isInterrupted())
                       wt.interrupt();
                   try {
                           //执行beforeExecute方法
                       beforeExecute(wt, task);
                       Throwable thrown = null;
                       try {
                           //调用Runable的run方法
                        task.run();
                       } catch (RuntimeException x) {
                         thrown = x; throw x;
                        } catch (Error x) {
                        thrown = x; throw x;
                         } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                        } finally {
                        // 执行aferExecute方法
                        afterExecute(task, thrown);
                       }
                   } finally {
                       task = null;
                       w.completedTasks++;
                       w.unlock();
                   }
               }
               completedAbruptly = false;
           } finally {
               processWorkerExit(w, completedAbruptly);
           }
    }

自己动手实现一个简易线程池

线程池类

@Slf4j
public class SimpleThreadPoolExecutor {

    private final BlockingQueue<Runnable> blockingQueue;

    public SimpleThreadPoolExecutor(BlockingQueue<Runnable> blockingQueue, int threadSize) {
        this.blockingQueue = blockingQueue;
        Set<SimpleThread> workThreads = new HashSet<>();
        for (int i = 0; i < threadSize; i++) {
            SimpleThread simpleThread = new SimpleThread("simpleThread-" + i);
            simpleThread.start();
            workThreads.add(simpleThread);
        }
    }

    public void execute(Runnable task) {
        blockingQueue.add(task);
    }

    @Data
    private final class SimpleThread extends Thread {

        public SimpleThread(@NotNull String name) {
            super(name);
        }

        @Override
        public void run() {
            while (true) {
                try {
                    // 这里如果没有任务,会一直阻塞(底层调了LockSupport.park(this))
                    Runnable task = blockingQueue.take();
                    task.run();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    log.error("【获取任务发生异常】, 错误信息 = {}", ExceptionUtils.getRootCauseMessage(e));
                }
            }
        }
    }

}

测试

public static void main(String[] args) {
    SimpleThreadPoolExecutor executor =
        new SimpleThreadPoolExecutor(new LinkedBlockingQueue<>(), Runtime.getRuntime().availableProcessors());
    Random r = new Random();
    for (int i = 0; i < 100; i++) {
        executor.execute(() -> System.err.println(Thread.currentThread().getName() + ": 执行结果为" + r.nextInt(100)));
    }
}

-------------  测试结果
simpleThread-0: 执行结果为51
simpleThread-1: 执行结果为55
simpleThread-4: 执行结果为74
simpleThread-7: 执行结果为21
simpleThread-4: 执行结果为36
simpleThread-1: 执行结果为57
simpleThread-6: 执行结果为81
......

几道关于线程池的问题及解答

问题一

如果线程数小于核心线程数,并且线程都处于空闲状态,现提交一个任务,是新起一个线程还是给之前创建的线程?

  • 线程池会新起一个线程来执行这个新任务,不管老线程是否空闲。

问题二

如果线程池中的线程在执行任务的时候,抛异常了,会怎么样?

  • 查看runWorker方法,如果发生异常会进入到afterExecute(task, thrown), 并且也继续被抛了出来 ,进入processWorkerExit(w, completedAbruptly)方法
  • 简而言之把这个线程废了,然后新建一个线程。
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            // 移除了线程的引用, gc会处理
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            // 新加入一个线程
            addWorker(null, false);
        }
    }

问题三

线程池的核心线程在空闲的时候一定不会被回收吗?

  • 有个 allowCoreThreadTimeOut 方法,把它设置为 true ,则所有线程都会超时时间,不会有核心数那条线的存在。具体是会调用 interruptIdleWorkers 这个方法

问题四

线程池如何动态修改核心线程数和最大线程数?

其实之所以会有这样的需求是因为线程数是真的不好配置。

你可能会在网上或者书上看到很多配置公式,比如:

  • CPU 密集型的话,核心线程数设置为 CPU核数+1
  • I/O 密集型的话,核心线程数设置为 2 * CPU核数

比如:线程数=CPU核数 *(1+线程等待时间 / 线程时间运行时间)

这个比上面的更贴合与业务,还有一些理想的公式就不列了。就这个公式而言,这个线程等待时间就很难测,拿 Tomcat 线程池为例,每个请求的等待时间能知道?不同的请求不同的业务,就算相同的业务,不同的用户数据量也不同,等待时间也不同。

所以说线程数真的很难通过一个公式一劳永逸,线程数的设定是一个迭代的过程,需要压测适时调整,以上的公式做个初始值开始调试是 ok 的。

再者,流量的突发性也是无法判断的,举个例子 1 秒内一共有 1000 个请求量,但是如果这 1000 个请求量都是在第一毫秒内瞬时进来的呢?


java   线程池      java 多线程

本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!

分布式事务解决方案-Seata 下一篇