线程池之FutureTask
简介
Java线程池部分常用的执行器是ThreadPoolExecutor,依赖底层的Future接口,本章节先介绍实现Future接口用得最多的FutureTask作为铺垫。
FutureTask的作用
首先得提出一个问题,Java创建线程的方式有几种? 互联网上的答案真的是千奇百怪,抛开用户自己编写的JNI,答案只有一种,new Thread。
单纯的new Thread有什么痛点呢? 没有线程之间的共享变量,需要用户自己封装继承Thread的类或者传入实现Runnable的类。
Future接口主要解决两个痛点:
- 解决以上提及new Thread的痛点,解决方式Future继承了Runnable,作为参数传递Thread,暴露API查询变量;
- Future是一个异步框架的接口,方便异步编程。
FutureTask层次图
查看Future接口:
1
2
3
4
5
6
7
8
9
10
11
12
13
public interface Future<V> {
// 在线程未start之前可以取消,若线程可以中断且mayInterruptIfRunning=true,此方法可中断执行中的线程
boolean cancel(boolean mayInterruptIfRunning);
// 是否在完成任务前被取消
boolean isCancelled();
// 是否完成任务
boolean isDone();
// 获取任务返回的结果,若未完成则阻塞等待直到完成为止,若任务抛异常则转换为ExecutionException抛出
V get() throws InterruptedException, ExecutionException;
// 在get()基础上增加阻塞超时
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
通过以上的API就能异步获取线程执行任务的状态和结果。
FutureTask源码分析
FutureTask数据结构
相关的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class FutureTask<V> implements RunnableFuture<V> {
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
// 计算中
private static final int COMPLETING = 1;
// 完成
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
// 执行的任务
private Callable<V> callable;
// 任务返回的值或者异常
private Object outcome; // non-volatile, protected by state reads/writes
// runner的运行线程
private volatile Thread runner;
// Treiber stack简单的FILO栈
private volatile WaitNode waiters;
// 单向链表
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
// CAS
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = FutureTask.class;
stateOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("state"));
runnerOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("runner"));
waitersOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiters"));
} catch (Exception e) {
throw new Error(e);
}
}
}
FutureTask需要保存运行的线程因为cancel需要获取运行中的线程Object。 注意waiters字段是一个Treiber stack用CAS乐观锁解决并发冲突简单的栈,为什么需要waiters呢?
想象场景:FutureTask的结果是多个线程执行后面逻辑的必要条件,则此FutureTask运行未完成,其他线程调用get()的时候会阻塞,需要一个waiters链表记录阻塞的线程以待后续唤醒。
FutureTask逻辑源码
相关的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
// 返回结果或者抛异常
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
/* Callable接口是一个返回结果的任务
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
*/
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public boolean isCancelled() {
return state >= CANCELLED;
}
public boolean isDone() {
return state != NEW;
}
public boolean cancel(boolean mayInterruptIfRunning) {
// 只要state == NEW则代表任务还未完成,则NEW->INTERRUPTING or NEW->CANCELLED
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // 发出中断异常
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally {
// 最后的状态,禁止state的volatile关键字,小优化;
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
// 获取结果
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
// 模板模式,子类实现
protected void done() { }
// 设置正常的返回值
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
// 最后的状态,禁止state的volatile关键字,小优化;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
finishCompletion();
}
}
// 设置异常
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
// 最后的状态,禁止state的volatile关键字,小优化;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);
finishCompletion();
}
}
public void run() {
// 通过CAS绑定FutureTask和Thread并避免线程竞争,有可能多个new Thread(futureTask).start()竞争的场景
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 执行任务
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state有可能被cancel
int s = state;
if (s >= INTERRUPTING)
// 自旋等待state由INTERRUPTING->INTERRUPTED
handlePossibleCancellationInterrupt(s);
}
}
private void handlePossibleCancellationInterrupt(int s) {
/* cancel方法中这小段代码state由INTERRUPTING->INTERRUPTED
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally {
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
这里等待这段代码处理完毕
*/
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
}
// 完成后的逻辑
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
// 清除waiters
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
// 唤醒阻塞的线程
LockSupport.unpark(t);
}
WaitNode next = q.next;
// 链表tail则结束
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
// 子类实现
done();
callable = null; // to reduce footprint
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// 完成/取消返回,若第一次循环则不入栈
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
// COMPLETING代表执行完成只差结果的赋值则释放等待自己的线程
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// 还没执行完且第一次循环则新建等待节点
else if (q == null)
q = new WaitNode();
// 还没执行完则把建立的节点放入栈等待
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 此线程的节点已经入栈,可以阻塞等待
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
// 效率很低的remove,因存在竞争
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) {
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
// 忽略q.thread != null的节点
if (q.thread != null)
pred = q;
// q.thread == null证明 q == node 或者q节点已经完成/中断,需要移除
else if (pred != null) {
// 删除节点
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
// 能到这里代表 pred == null为第一次循环
// 把waiters引用新的head
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))
continue retry;
}
break;
}
}
}
Future是需要跟创建线程配合的,调用runner和get()的线程是两个线程。简单异步流程如下: Trunner:runner = new Thread(Future)-> runner.start Tget: future = new Future(Callable)-> future.get()