一张图概览线程池的处理逻辑
ThreadPoolExecutor的核心参数
- 先来看看全参构造器
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize(核心线程大小)
- 当
poolSize
小于corePoolSize
时,线程池会创建一个新的线程来处理任务,即使其它空闲的线程能处理新任务也会创建线程 - 调用线程池的
prestartAllCoreThreads()
方法会提前创建,并且启动所有基本线程。
maximumPoolSize(线程池的最大数量)
- 如果任务队列满了,并且已创建的线程数,小于最大数量,线程池就会创建一个新的线程来处理。
- 如果
workQueue
是无界队列,那么这个参数就没效果
keepAliveTime(线程存活时间)
- 线程的工作线程空闲后保持存活的世界,默认情况下当线程池的线程数小于
corePoolSize
时生效。当allowCoreThreadTimeOut
设置为true时核心线程超时也会被回收 - 如果任务很多,每个任务执行时间比较短,可以适当调大时间,提高线程利用率
workQueue(任务队列)
- 用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列。
ArrayBlockingQueue
:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序。LinkedBlockingQueue
:一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue
。静态工厂方法Executors.newFixedThreadPool()
使用了这个队列。SynchronousQueue
:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue
,静态工厂方法Executors.newCachedThreadPool
使用了这个队列。priorityBlockingQueue
:一个具有优先级的无限阻塞队列。
threadFactory(线程工厂)
用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。
使用开源框架guava提供的
ThreadFactoryBuilder
可以快速给线程池里的线程设置有意义的名字,代码如下。new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();
handler(饱和策略)
当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy
,表示无法处理新任务时抛出异常。在JDK 1.5中Java线程池框架提供了以下4种策略。
AbortPolicy
:直接抛出异常。CallerRunsPolicy
:调用任务的run()
方法绕过线程池直接执行。DiscardOldestPolicy
:丢弃队列里最近节点的一个任务,并执行当前任务。DiscardPolicy
:不处理,丢弃掉。
小结
- 如果当前运行的线程少于
corePoolSize
,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。 - 如果运行的线程等于或多于
corePoolSize
,则将任务加入BlockingQueue
。 - 如果无法将任务加入
BlockingQueue
(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。 - 如果创建新线程将使当前运行的线程超出
maximumPoolSize
,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()
方法。根据不同的拒绝策略去处理。
ThreadPoolExecutor采取上述步骤的总体设计思路,是为了在执行execute()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热之后(当前运行的线程数大于等于corePoolSize),几乎所有的execute()方法调用都是执行步骤2,而步骤2不需要获取全局锁。
核心源码剖析
处理线程池状态和工作队列的参数 - ctl
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 000-1(29个1)
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//用左边3位,实现5种线程状态
//111-0,,, 此状态表示线程池能接收新的任务,十进制为: -536,870,912
private static final int RUNNING = -1 << COUNT_BITS;
//000-0... 此此状态不接受新的任务,但是可以继续执行队列中的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
//001-0... 此状态全面拒绝,并中断正在处理的任务 十进制为: 536,870,912
private static final int STOP = 1 << COUNT_BITS;
//010-0... 此状态表示所有任务已经被终止
private static final int TIDYING = 2 << COUNT_BITS;
//011-0... 此状态表示已经清理完现场
private static final int TERMINATED = 3 << COUNT_BITS;
// ~掩码取反
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取工作线程数目
private static int workerCountOf(int c) { return c & CAPACITY; }
// 合并rs(工作状态)和wc(工作线程数) 或运算 到一个32位数中
private static int ctlOf(int rs, int wc) { return rs | wc; }
线程池状态及状态切换
RUNNING
:能接受新任务,并处理阻塞队列中的任务SHUTDOWN
:不接受新任务,但是可以处理阻塞队列中的任务STOP
:不接受新任务,并且不处理阻塞队列中的任务,并且还打断正在运行任务的线程,就是直接撂担子不干了!TIDYING
:所有任务都终止,并且工作线程也为0,处于关闭之前的状态TERMINATED
:已关闭。
execute方法
public void execute(Runnable command) {
//如果提交了空的任务 抛出异常
if (command == null)
throw new NullPointerException();
int c = ctl.get();//获取当前线程池的状态
//检查当前工作线程数量是否小于核心线程数量
if (workerCountOf(c) < corePoolSize) {
// 通过addWorker方法提交任务
if (addWorker(command, true))
return;
c = ctl.get();//如果提交失败 需要二次检查状态
}
//向工作线程提交任务,如果线程池为running,则将线程加入等待队列
if (isRunning(c) && workQueue.offer(command)) {
// 再次检查状态
int recheck = ctl.get();
//如果重新检查发现线程池不为ruuning,就从队列中移除线程,并执行拒绝策略
if (!isRunning(recheck) && remove(command))
reject(command);
//如果之前的线程已经消费完,就新建一个线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果核心池和等待队列都满了,则尝试创建一个新线程
else if (!addWorker(command, false))
//如果addWork()返回false,则执行拒绝策略
reject(command);
}
Worker类
Worker
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* 这里传入任务,并将Worker自己放入到thread中
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//
this.thread = getThreadFactory().newThread(this);
}
/** 后续工作线程执行的时候触发这里 */
public void run() {
runWorker(this);
}
}
addWorker方法
- 这个方法是任务提交的一个核心方法。在里面完成了状态检查、新建任务、执行任务等一系列动作。
/**
* 检查是否能添加一个新的任务线程,如果可以则创建并启动任务
* 返回false的可能性如下:
* 1.线程池不为RUNNING状态
* 2.线程工厂创建新的任务线程失败
* firstTask: 外部启动线程池时需要构造的第一个线程
*/
private boolean addWorker(Runnable firstTask, boolean core) {
//循环标记
retry:
//死循环更新状态
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);//获取运行状态
//检查线程池是否处于关闭状态
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 获取当前工作线程数量
int wc = workerCountOf(c);
//如果已经超过corePoolSize获取maximumPoolSize 返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS增加一个工作线程
if (compareAndIncrementWorkerCount(c))
break retry;
//再次获取状态
c = ctl.get(); // Re-read ctl
//如果状态更新失败 则循环更新
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);//初始化一个工作线程
final Thread t = w.thread;
if (t != null) {
// 获得全局锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 添加工作到hashset中保存
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 工作线程启动 执行第一个任务 就是新提交的任务
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
runWorker方法
在看
addWorker
时发现,当工作线程创建成功后,会调用t.start()
我们知道它实际执行的就是Worker
对象的run()
方法,而worker
的run()
方法是这样定义的:public void run() { runWorker(this); }
它实际上是将自己委托给线程池的
runWorker
方法,不断执行我们提交的任务的run方法。而这个任务可能是我们新提交的,也有可能是从等待队列中获取的。这样就实现了线程池的完成逻辑。final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //不断地从blockingQueue获取任务 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //执行beforeExecute方法 beforeExecute(wt, task); Throwable thrown = null; try { //调用Runable的run方法 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 执行aferExecute方法 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
自己动手实现一个简易线程池
线程池类
@Slf4j
public class SimpleThreadPoolExecutor {
private final BlockingQueue<Runnable> blockingQueue;
public SimpleThreadPoolExecutor(BlockingQueue<Runnable> blockingQueue, int threadSize) {
this.blockingQueue = blockingQueue;
Set<SimpleThread> workThreads = new HashSet<>();
for (int i = 0; i < threadSize; i++) {
SimpleThread simpleThread = new SimpleThread("simpleThread-" + i);
simpleThread.start();
workThreads.add(simpleThread);
}
}
public void execute(Runnable task) {
blockingQueue.add(task);
}
@Data
private final class SimpleThread extends Thread {
public SimpleThread(@NotNull String name) {
super(name);
}
@Override
public void run() {
while (true) {
try {
// 这里如果没有任务,会一直阻塞(底层调了LockSupport.park(this))
Runnable task = blockingQueue.take();
task.run();
} catch (InterruptedException e) {
e.printStackTrace();
log.error("【获取任务发生异常】, 错误信息 = {}", ExceptionUtils.getRootCauseMessage(e));
}
}
}
}
}
测试
public static void main(String[] args) {
SimpleThreadPoolExecutor executor =
new SimpleThreadPoolExecutor(new LinkedBlockingQueue<>(), Runtime.getRuntime().availableProcessors());
Random r = new Random();
for (int i = 0; i < 100; i++) {
executor.execute(() -> System.err.println(Thread.currentThread().getName() + ": 执行结果为" + r.nextInt(100)));
}
}
------------- 测试结果
simpleThread-0: 执行结果为51
simpleThread-1: 执行结果为55
simpleThread-4: 执行结果为74
simpleThread-7: 执行结果为21
simpleThread-4: 执行结果为36
simpleThread-1: 执行结果为57
simpleThread-6: 执行结果为81
......
几道关于线程池的问题及解答
问题一
如果线程数小于核心线程数,并且线程都处于空闲状态,现提交一个任务,是新起一个线程还是给之前创建的线程?
- 线程池会新起一个线程来执行这个新任务,不管老线程是否空闲。
问题二
如果线程池中的线程在执行任务的时候,抛异常了,会怎么样?
- 查看
runWorker
方法,如果发生异常会进入到afterExecute(task, thrown)
, 并且也继续被抛了出来 ,进入processWorkerExit(w, completedAbruptly)
方法 - 简而言之把这个线程废了,然后新建一个线程。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 移除了线程的引用, gc会处理
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 新加入一个线程
addWorker(null, false);
}
}
问题三
线程池的核心线程在空闲的时候一定不会被回收吗?
- 有个
allowCoreThreadTimeOut
方法,把它设置为 true ,则所有线程都会超时时间,不会有核心数那条线的存在。具体是会调用interruptIdleWorkers
这个方法
问题四
线程池如何动态修改核心线程数和最大线程数?
其实之所以会有这样的需求是因为线程数是真的不好配置。
你可能会在网上或者书上看到很多配置公式,比如:
- CPU 密集型的话,核心线程数设置为 CPU核数+1
- I/O 密集型的话,核心线程数设置为 2 * CPU核数
比如:线程数=CPU核数 *(1+线程等待时间 / 线程时间运行时间)
这个比上面的更贴合与业务,还有一些理想的公式就不列了。就这个公式而言,这个线程等待时间就很难测,拿 Tomcat 线程池为例,每个请求的等待时间能知道?不同的请求不同的业务,就算相同的业务,不同的用户数据量也不同,等待时间也不同。
所以说线程数真的很难通过一个公式一劳永逸,线程数的设定是一个迭代的过程,需要压测适时调整,以上的公式做个初始值开始调试是 ok 的。
再者,流量的突发性也是无法判断的,举个例子 1 秒内一共有 1000 个请求量,但是如果这 1000 个请求量都是在第一毫秒内瞬时进来的呢?