网站首页 > 技术文章 正文
构造函数参数介绍
corePoolSize:核心线程数大小。线程池会维持该数量的线程处理任务或者等待消费任务队列中的任务,需要注意的是核心线程不是在线程池创建时立即构建的,而是随着线程池处理任务的提交增加的,即提交一个任务到线程池中,线程池判断当前核心线程数小于设置的corePoolSize,则直接新建一个线程,并将该任务作为该工作线程的首个任务,直到达到核心线程数的工作线程数量为止。
maximumPoolSize:线程池允许的最大工作线程数量。首先我们要知道,除了初始时,核心线程数量未达到corePoolSize设置的数量时,是直接将任务放到新建的核心工作线程中的,其他的任务 都是需要添加到线程池的任务队列中等待线程池中的工作线程消费的,而在任务队列放任务失败时(任务队列已满),会在线程池中线程数不超过maximumPoolSize数量情况下,选择创建工作线程将任务放入,以增加任务的消费速度,maximumPoolSize就是限制这种临时用于加快消费的工作线程的数量的值,因为并不是线程越多消费的越快。
keepAliveTime+TimeUnit:限定了非核心线程在没有任务可供处理时的最大存活时间,如果设为1min,那么就是某非核心线程等待从工作队列中获取任务处理,超时时间1min,如果1min内还是没有任务,则销毁,这主要是因为工作线程的创建启动也需要时间,有可能在任务处理完毕后过一小段时间,又有大批任务等待处理,此时又可能需要再创建非核心线程,那么为了避免这种情况,选择让非核心线程等待一段时间,如果这段时间内无任务,再销毁,也就有了这两个参数。
BlockingQueue workQueue:任务队列,从BlockingQueue类型可以看出就是fifo的阻塞队列,用来放置等待工作线程处理的任务的队列,工作线程任务除了创建时的首个任务,就是从该队列中取。
threadFactory:线程工厂,用于创建新线程。默认的线程工厂有统一的命名格式“pool-线程池编号-thread-线程编号”,其中线程池和线程编号都是累加的一个数值。
RejectedExecutionHandler:拒绝策略,即在线程池的核心线程已经构建完毕,任务放到任务队列失败,此时又新建非核心线程失败的情况下(即任务已经没有地方可以存放),就会执行失败策略。失败策略有四种分别为
- AbortPolicy(默认策略)抛出异常RejectedExecutionException
- CallerRunsPolicy通过当前线程执行任务。
- DiscardOldestPolicy 从任务队列中扔掉一个任务,再执行本任务。
- DiscardPolicy 不做任何操作
部分字段介绍
线程池以一个int字段记录了线程池的当前状态和工作线程个数,其中高三位记录状态,低29位记录线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
//接受新任务并且继续处理阻塞队列中的任务
private static final int RUNNING = -1 << COUNT_BITS;
//不接受新任务但是会继续处理阻塞队列中的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
//不接受新任务,不再执行阻塞队列中的任务,中断正在执行的任务
private static final int STOP = 1 << COUNT_BITS;
//所有任务都已经完成,线程数都被回收,线程会转到TIDYING状态会继续执行钩子方法
private static final int TIDYING = 2 << COUNT_BITS;
//钩子方法执行完毕
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
//取c的高3位,因为RUNNING这些状态都是int的高3位表示,包括符号位,(RUNNING=101,SHUTDOWN=000,STOP=001,TIDYING=010,TERMINATED=011)
//CAPACITY=(1 << 29) - 1=二进制的29个1,所以~CAPACITY=高三位为111,所以c & ~CAPACITY==取c的高三位
private static int runStateOf(int c) { return c & ~CAPACITY; }
//c的低29位 用于存储工作线程数量
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
1234567891011121314151617181920212223
执行相关代码解析
从execute源码,可以看到一个任务进入线程池后的处理逻辑,
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//获取运行中的线程数
int c = ctl.get();
//如果运行中线程数小于核心线程数,则新增addworker
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果线程池处于运行状态,且新增任务到工作队列成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//如果已经不在运行状态,且从任务队列中删除成功
if (! isRunning(recheck) && remove(command))
//执行拒绝策略
reject(command);
//线程池处于运行状态,但是没有工作线程,则新增工作线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//向线程池新增工作线程失败,则执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
1234567891011121314151617181920212223242526
addWorker为核心工作线程和非核心工作线程的新增逻辑,
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//正常情况下是只要线程池状态大于等于shutdown状态,就不新增工作线程了,
//但需要排除一种情况就是 shutdown状态下 工作线程非空 又因为不能新增任务 因而firstask要为null
//也就是排除 (rs == SHUTDOWN && firstTask == null &&! workQueue.isEmpty())所以非
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN && firstTask == null &&! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//如果工作线程已经大于等于最大容量或者 超过核心或最大线程数(取决于当前要新增的线程类型),则结束返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//比较更新工作线程+1,成功则跳出彻底循环
if (compareAndIncrementWorkerCount(c))
break retry;
//再次取线程池状态比对,如果不同,说明线程池状态已经被其他线程修改,需要跳到主循环中再次取值判断线程池状态,因而continue retry
//否则cas失败的原因应该只是工作线程数量的变更,因而选择再次自旋获取当前工作线程数量 重试
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//走到这,说明已经满足了工作线程新增条件,只需要新增工作线程 并加入到 工作线程池 中 启动 即可
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//创建工作线程,并将首任务放入
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
//加锁,防止并发访问workers对象
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//线程t正常情况下只是刚新建,还未启动的状态,所以如果是alive状态,是非法的
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//将工作线程加入到容器workers中
workers.add(w);
int s = workers.size();
//largestPoolSize用于记录存储的最大工作线程数量
if (s > largestPoolSize)
largestPoolSize = s;
//工作线程添加成功的标志,尽可能将同步中的处理减少,即在同步中的都是必要同步的处理,
//mainlock锁定只需要保证 对于工作线程workers的添加和大小的获取处理,至于线程的启动 可以延后,
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//根据上面同步访问workers添加的结果,判断 如果添加成功 则可以启动工作线程t
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//根据工作线程启动结果,判断是否执行添加工作线程失败方法
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
从上述代码可以看到一个工作任务的进入处理逻辑,但是却没有真正消费的处理,而这数隐含在Worker内部类中的。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread;
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
}
从上面可以看到,addworker执行run方法,实际是执行的 runWorker(this);方法
线程池任务消费逻辑
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//拿到worker中存储的首个任务
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//如果task不为空,则直接处理task,这用于对首个任务的处理
//task为空,则getTask()从任务队列中取
//需要注意的是工作线程核心线程的保持,就是依靠于 getTask()中任务的阻塞获取
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
//runStateAtLeast(ctl.get(), STOP)即线程池状态为Stop及以上状态(STOP,TIDYING,TERMINATED)
//而且当前线程的状态还未终止,则调用interrupt终止
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//执行任务前的执行方法,可以通过子类重写
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//与beforeExecute类似,一前一后
afterExecute(task, thrown);
}
} finally {
//清空task,下次循环 再取
task = null;
//记录完成的任务数量
w.completedTasks++;
//解锁,允许中断
w.unlock();
}
}
completedAbruptly = false;
} finally {
//线程销毁逻辑
processWorkerExit(w, completedAbruptly);
}
}
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
//自旋
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//检查线程池状态,首先rs >= SHUTDOWN,限定了只有在shutdown及以上状态,才需要判断后面的逻辑,如果是running状态,则直接跳过
//shutdown及以上状态中,如果是rs >= STOP状态,则不消费,工作线程数-1,并返回空任务,
//而workQueue.isEmpty()只有在rs== SHUTDOWN时才需要判断,即shutdown状态,且任务队列为空,那么也没有需要消费的了
//即相当于rs >= STOP||(rs== SHUTDOWN&&workQueue.isEmpty())但这么写 在running状态下就需要判断两个条件了。效率低了
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
//如果设置了核心线程允许超时 或者是 线程数量 超过了核心线程数,则需要走从任务队列取时有超时时间的逻辑
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//如果工作线程数量 超过了最大工作线程数 或者 设置了超时且 已经在上次取任务时超时过了
//并且任务线程数量大于1或者任务队列数量是空的
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//如果cas失败,说明有其他线程也在进行该逻辑,就需要再次自旋判断
//避免了有两个线程拿到了一个c值,同时进入减1操作的逻辑
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//根据是否需要设置取任务时的超时时间,判断从任务队列取任务的方法是一直阻塞还是有超时时间的阻塞获取
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
//r != null即取到了任务,则直接返回,否则就是没取到任务,
if (r != null)
return r;
//走到这说明没取到任务,那么就是本次是设置了超时时间的任务获取,但是在时限范围内未获取到任务,则设置timedout=true,自旋
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
shutdown
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
//获得线程池锁
mainLock.lock();
try {
checkShutdownAccess();
//修改线程池状态至少shutdown及以上状态
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//遍历hashset中所有的工作线程worker
for (Worker w : workers) {
Thread t = w.thread;
//如果线程未被终止,获取锁,需要注意的是 用的tryLock,即尝试cas,失败则不重试
//因而,在runworker中获取了任务和锁的线程不会立即停止,
//而是在下次获取任务的gettask中获取任务前的检查中感知到线程池状态在进行退出
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
//根据onlyone判断是否只是终止一个线程
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
以上为个人对于源码的理解和注释,如有错误或疑问,欢迎一起讨论,谢谢!
- 上一篇: 心跳与超时:高并发高性能的时间轮超时器
- 下一篇: springboot-如何配置线程池实现定时任务
猜你喜欢
- 2024-11-21 Java 线程池 | ThreadPoolExecutor 原理分析
- 2024-11-21 面试侃集合 | DelayQueue篇
- 2024-11-21 springboot-如何配置线程池实现定时任务
- 2024-11-21 心跳与超时:高并发高性能的时间轮超时器
- 2024-11-21 Java线程池ThreadPoolExecutor 线程池
- 2024-11-21 8 个线程池的深渊巨坑,使用不当直接生产事故!!!
- 2024-11-21 java线程池核心类ThreadPoolExecutor的应用
- 2024-11-21 Java线程池:ExecutorService 开发入门
- 2024-11-21 面试官:你给我说一下什么是时间轮吧?
- 2024-11-21 定时任务实现的关键DelayQueue延迟队列
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- oraclesql优化 (66)
- 类的加载机制 (75)
- feignclient (62)
- 一致性hash算法 (71)
- dockfile (66)
- 锁机制 (57)
- javaresponse (60)
- 查看hive版本 (59)
- phpworkerman (57)
- spark算子 (58)
- vue双向绑定的原理 (68)
- springbootget请求 (58)
- docker网络三种模式 (67)
- spring控制反转 (71)
- data:image/jpeg (69)
- base64 (69)
- java分页 (64)
- kibanadocker (60)
- qabstracttablemodel (62)
- java生成pdf文件 (69)
- deletelater (62)
- com.aspose.words (58)
- android.mk (62)
- qopengl (73)
- epoch_millis (61)
本文暂时没有评论,来添加一个吧(●'◡'●)