线程池之ThreadPoolExecutor
简介
讲解完线程池之FutureTask和线程池之BlockingQueue后继续线程池的主菜ThreadPoolExecutor。
ThreadPoolExecutor层次图
ThreadPoolExecutor继承AbstractExecutorService,先看看AbstractExecutorService的主要方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public abstract class AbstractExecutorService implements ExecutorService {
// 新建FutureTask
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
// 提交任务并返回FutureTask
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
// 执行任务
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
}
AbstractExecutorService主要是封装了Future和执行任务,还有其他invokeAny和invokeAll的方法,本质上还是在Future层面上封装,感兴趣读者可自行查看相关源码。
ThreadPoolExecutor数据结构
ThreadPoolExecutor相关数据代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public class ThreadPoolExecutor extends AbstractExecutorService {
// ctl是32bit,低3位是状态,高29位是worker的数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
/* RUNNING -> SHUTDOWN
* On invocation of shutdown(), perhaps implicitly in finalize()
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* SHUTDOWN -> TIDYING
* When both queue and pool are empty
* STOP -> TIDYING
* When pool is empty
* TIDYING -> TERMINATED
* When the terminated() hook method has completed
*/
// 运行中,可接受新任务和处理队列中的任务
private static final int RUNNING = -1 << COUNT_BITS;
// 关闭,不接受新任务,但仍可处理队列中的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 停止,不可接受新任务,不处理队列中的任务,中断运行中的任务
private static final int STOP = 1 << COUNT_BITS;
// 所有任务已经被终止,准备调用terminated()hook方法
private static final int TIDYING = 2 << COUNT_BITS;
// 调用完terminated()hook方法
private static final int TERMINATED = 3 << COUNT_BITS;
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
// 设置状态
private static int ctlOf(int rs, int wc) { return rs | wc; }
// workQueue是阻塞队列
private final BlockingQueue<Runnable> workQueue;
// 锁住为了统计worker集合,本可以用concurrent集合,但考虑到shutdown()并发问题这里统一用一个lock
// 详情可以参考源码英文说明
private final ReentrantLock mainLock = new ReentrantLock();
// 线程池worker集合,同于统计线程信息
private final HashSet<Worker> workers = new HashSet<Worker>();
// 用于shutdown等待
private final Condition termination = mainLock.newCondition();
private int largestPoolSize;
// 完成的任务
private long completedTaskCount;
// ------------------线程池构造参数------------------
// 均为volatile,方便个线程查询
private volatile ThreadFactory threadFactory;
// 不能再创建资源或者SHUTDOWN
private volatile RejectedExecutionHandler handler;
// allowCoreThreadTimeOut超过corePoolSize的线程生存时间
// !allowCoreThreadTimeOut则所有线程的生存时间
private volatile long keepAliveTime;
// 默认false,我的分析以false为准
private volatile boolean allowCoreThreadTimeOut;
// 核心线程数
private volatile int corePoolSize;
// 最大线程数
private volatile int maximumPoolSize;
// ------------------线程池构造参数------------------
// 继承AQS实现互斥锁是为了在优雅关闭(调用shutdown())避免已经运行的任务被中断。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
private static final long serialVersionUID = 6138294804551838833L;
// worker工作的线程,worker和thread一对一
final Thread thread;
// 第一个任务,addWorker第一次会初始化。可能为null
Runnable firstTask;
// 单worker完成任务的数量
volatile long completedTasks;
}
}
ThreadPoolExecutor根据参数corePoolSize、maximumPoolSize、keepAliveTime、workQueue和handler初始化,根据参数添加创建线程封装成worker、动态管理worker。 workQueue是一个阻塞队列,这个队列对ThreadPoolExecutor的运行的流程影响非常大,一定要理解。
workQueue的实现
理论上所有实现BlockingQueue的队列都能作为workQueue,源码有三个推荐的队列分别是SynchronousQueue、LinkedBlockingQueue和ArrayBlockingQueue。请参考线程池之BlockingQueue
源码流程
提交线程
由调用submit开始讲起,会封装FutureTask以及调用execute,相关源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 获取worker数量,若小于corePoolSize,调用addWorker,addWorker有失败可能
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
// 失败则重新获取ctl
c = ctl.get();
}
// 线程池在工作且workQueue.offer成功(offer不阻塞,立马返回true or false)
// 若workQueue=ArrayBlockingQueue,未满则成功
// 若workQueue=LinkedBlockingQueue,LinkedBlockingQueue有界则同ArrayBlockingQueue,无界则返回成功
// 若workQueue=SynchronousQueue,只要worker没有阻塞workQueue都会返回失败
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 在Running状态,并且任务加入workQueue成功。
// 若recheck不是Running,证明并发发生了shutdown,则删除workQueue的任务并调用reject
if (! isRunning(recheck) && remove(command))
reject(command);
// 到这里表示还是Running状态且任务进入workQueue
// 若allowCoreThreadTimeOut=true或corePoolSize的线程抛出异常,能使worker小于corePoolSize甚至等于0并运行到这里
else if (workerCountOf(recheck) == 0)
// 添加worker,任务已经在workQueue里面
addWorker(null, false);
}
// 此时状态变为Shutdown调用addWorker(command, false)就会失败调用reject
// 若workQueue=SynchronousQueue,则可能(如大于maximumPoolSize会创建失败)创建新的worker
else if (!addWorker(command, false))
reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// STOP TIDYING TERMINATED均返回false
// SHUTDOWN则判断first!=null || workQueue为空则返回false
// 原因是调用shutdown(),worker减少,在processWorkerExit会调用addWorker(null, false)重新创建worker
// SHUTDOWN定义是不接受新任务,但任务在队列就要执行,所以这里不能返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// workerCount++,CAS操作成功才跳出循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建线程但未start
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 获取锁后再刷一下状态
int rs = runStateOf(ctl.get());
// rs可能是RUNNING或者SHUTDOWN && firstTask=null但workQueue不为空
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 黑人问号,线程小弟你什么时候启动了?
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// add成功标记
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 线程start
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// 回滚,workCount-- && workQueue.remove
addWorkerFailed(w);
}
return workerStarted;
}
相信我看到注释也是两眼泪汪汪,因涉及到大量的SHUTDOWN状态的改变和判断和并发问题:(
Worker的一生
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
// 线程“猝死”标记
boolean completedAbruptly = true;
try {
// task = getTask()有可能会阻塞,阻塞中的称为idle状态,可调用shutdown()发起中断唤醒
while (task != null || (task = getTask()) != null) {
// 线程锁,shutdown()发起中断会先tryLock,不成功不会发起中断唤醒
// shutdownNow()可强制中断,至于能不能唤醒task.run()要看task有无捕捉中断
w.lock();
// 失敬失敬,很绕很绕,我也不懂,原文注解写shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
wt.interrupt();
try {
// hook方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// hook方法
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
// 解锁
w.unlock();
}
}
// 抛异常无法执行这里,表示“猝死”
completedAbruptly = false;
} finally {
// 帮线程办理后事
processWorkerExit(w, completedAbruptly);
}
}
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// SHUTDOWN && workQueue空则null
// STOP直接null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 返回null,worker准备消亡,workerCount--
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// workerCount居然会大于maximumPoolSize?
// 其实setMaximumPoolSize可以改变maximumPoolSize,方法内中断getTask,所以这里要减少线程
// timed && timedOut代表keepAliveTime时间到了,要消灭worker
//public void setMaximumPoolSize(int maximumPoolSize) {
// this.maximumPoolSize = maximumPoolSize;
// if (workerCountOf(ctl.get()) > maximumPoolSize)
// interruptIdleWorkers();
//}
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//workerCount--
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// 此处r=null
timedOut = true;
} catch (InterruptedException retry) {
// 阻塞被中断
timedOut = false;
}
}
}
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 线程“猝死”,workerCount--,非“猝死”已经在getTask()处理完了,别担心
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 此worker准备消亡
workers.remove(w);
} finally {
mainLock.unlock();
}
// 尝试变为TIDYING状态
tryTerminate();
int c = ctl.get();
// RUNNING或者SHUTDOWN
if (runStateLessThan(c, STOP)) {
// 非“猝死”,只是runWorker当时getTask返回null
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 当前线程数大于min,不需要重复开启
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 死了一个worker,有千千万万的worker,维持min的线程数
addWorker(null, false);
}
}
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// RUNNING或者并发调用tryTerminate或者SHUTDOWN但队列还有数据则放弃尝试,return
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// hook方法
terminated();
} finally {
// 设置TERMINATED状态
ctl.set(ctlOf(TERMINATED, 0));
// 唤醒awaitTermination,应用可以调用此方法得知线程池完全结束
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
应用杀死Worker
线程池的应用方是可以杀死线程的,通过前面Worker的一生也知道,前提是任务能被中断,不然是任务会一直运行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// 优雅关闭
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 安全权限相关,可略过
checkShutdownAccess();
// 设置SHUTDOWN状态
advanceRunState(SHUTDOWN);
// 中断空闲worker,即在getTask阻塞的worker
interruptIdleWorkers();
onShutdown(); // hook方法
} finally {
mainLock.unlock();
}
tryTerminate();
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 正在执行task已经上锁,不能再获取锁
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 安全权限相关,可略过
checkShutdownAccess();
// 设置SHUTDOWN状态
advanceRunState(STOP);
// 把所有worker都中断,一个都不放过
interruptWorkers();
// 清空队列
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
至此线程池的源码基本分析完了,多看源码和注释就能理解了,下面谈谈最重要的应用。
线程池配置
原理知道了,是为了更好理解应用,现在回归初心,看看线程池应该怎么配置,下面是ThreadPoolExecutor参数最全的构造函数:
1
2
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
以下参数均是我个人理解的,与互联网上可能有出入。
- corePoolSize:Worker保持的最小数量(allowCoreThreadTimeOut=true例外)
- maximumPoolSize:Worker可以增长的最大数量
- keepAliveTime:Worker的空闲生命周期,空闲时(getTask)超过这个数就消亡
- unit:keepAliveTime单位
- workQueue:阻塞队列
- threadFactory:线程工厂
- handler:添加任务到线程池失败的处理
先看看Executors.java给提供ThreadPoolExecutor的3个静态方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 三个方法的handler都是new AbortPolicy();
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
// workQueue=new LinkedBlockingQueue<Runnable>(),无界,则容量Integer.MAX_VALUE
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
// workQueue=new LinkedBlockingQueue<Runnable>(),无界,则容量Integer.MAX_VALUE
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
// maximumPoolSize=Integer.MAX_VALUE
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
我先说出自己的结论,这三个方法都不要用到生产线上,均出现了Integer.MAX_VALUE,会耗尽服务器的资源,这是很致命的。 以下我谈谈自己对ThreadPoolExecutor的看法。
谈之前先得说明两个概念:
- CPU密集,即大量运算,瓶颈在CPU;
- IO密集,大量的IO操作和阻塞,如数据库,redis,网络通讯,瓶颈在IO。
当CPU密集的时候,线程数尽量和CPU数匹配,当IO密集,线程大部分都会被阻塞,此时线程数尽量是CPU数的几十倍。
默认的handler是AbortPolicy会抛出RejectedExecutionException运行异常,这需要业务人员catch处理(IDE不会提示运行异常)。
Task是CPU密集:
- corePoolSize:0.5CPU-CPU上下波动;
- maximumPoolSize:CPU-2CPU之间;
- keepAliveTime:可适当调大,快速响应业务偶发的并发浪涌
- workQueue:对公平性无要求则选择容量相对大的LinkedBlockingQueue,缓冲任务,
- handler:自己实现RejectedExecutionHandler,做好日志记录,特定情况可以延时重新提交这个任务
Task是IO密集:
- corePoolSize:0.5CPU-CPU上下波动;
- maximumPoolSize:几十倍甚至几百倍CPU
- keepAliveTime:因尽量小,让应用尽快回收线程的资源
- workQueue:选择SynchronousQueue,实时性强,可以快速扩展到maximumPoolSize提高并发,大部分Worker都因IO而阻塞
- handler:自己实现RejectedExecutionHandler,做好日志记录,特定情况可以延时重新提交这个任务