并发工具Semaphore CountDownLatch CyclicBarrier
简介
讲解三个JUC的三个工具Semaphore CountDownLatch CyclicBarrier,本质都是基于AbstractQueuedSynchronizer简称AQS AQS的知识可以查看互联网的资料或者我之前分析的JUCL之梳理(一)
Semaphore
Semaphore内部类继承AQS,是最简单的AQS共享锁运用。 初始化N个permit,获取锁-1,释放锁+1,当permit=0则达到非共享状态阻塞。 Semaphore也有公平锁和非公平锁,原理是判断AQS链表的是否有有效数据。 这里只讲解非公平锁相关的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class Semaphore implements java.io.Serializable {
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
// 初始化AQS的state为permit的总数
Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState();
}
final int nonfairTryAcquireShared(int acquires) {
// 自旋+CAS
for (;;) {
int available = getState();
int remaining = available - acquires;
// remaining < 0 失败 remaining >= 0成功
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
// 实现AQS的tryReleaseShared
protected final boolean tryReleaseShared(int releases) {
// 自旋+CAS
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
// 实现AQS的tryAcquireShared
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
// 传入permits,默认为非公平锁
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
// 直接调用AQS的共享锁获取API
sync.acquireSharedInterruptibly(permits);
}
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
// 直接调用AQS的共享锁释放API
sync.releaseShared(permits);
}
}
CountDownLatch
CountDownLatch内部类继承AQS,是一个计数器,调用AQS共享锁API。 初始化计数为N,调用AQS获取锁则park阻塞,直到调用N次AQS释放锁才unpark阻塞的线程。 源码非常简单,只列出锁相关代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class CountDownLatch {
// 构造函数
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
// 初始化AQS的state
Sync(int count) {
setState(count);
}
// state不等于0都获取锁失败,默默阻塞去吧!
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
// 全部锁释放完才return true
return nextc == 0;
}
}
}
private final Sync sync;
// 封装AQS获取共享锁API
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 封装AQS获取释放锁API
public void countDown() {
sync.releaseShared(1);
}
}
CyclicBarrier
CyclicBarrier依赖的是最常用的锁ReentrantLock,利用condition的await和signal对线程添加和删除线程屏障。 初始化一定的屏障,当最后一个屏障被添加的时候执行指定的任务(若有),并唤醒所有阻塞的线程。
流程相关的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
public class CyclicBarrier {
private static class Generation {
boolean broken = false;
}
private Generation generation = new Generation();
// 还有多少个线程在阻塞等待
private int count;
// 唤醒所有阻塞的线程并重置屏障
private void nextGeneration() {
// 调用AQS Condition的signalALL把单向链表的节点放入AQS的双向链表中依次唤醒
trip.signalAll();
count = parties;
generation = new Generation();
}
// 异常处理
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException, TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 进入屏障,屏障数量-1
int index = --count;
// 最后一个屏障触发传入构造函数的任务
if (index == 0) {
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 唤醒其他线程并重置
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
for (;;) {
try {
// 调用AQS Condition的await放入Conditon的单项链表阻塞等待
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 触发中断
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
// 异常处理
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
// 有指定任务的构造函数
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
// 无指定任务的构造函数
public CyclicBarrier(int parties) {
this(parties, null);
}
// 添加屏障
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
}
CyclicBarrier经常和CountDownLatch做比较,因为他们都可以用在并发测试单元模块…… 其实分析了原理这两个类还是很不一样的
- 原理上CyclicBarrier是Conditon的await和signalAll,CountDownLatch是共享锁,都能在达到一定情况唤醒线程并传播;
- CyclicBarrier是线程级别的,CountDownLatch是非线程级别,区别就是一个线程只有一个屏障,但可以多次计数;
- CyclicBarrier添加完最后屏障可以执行指定任务,且可以重置状态继续利用,而CountDownLatch则不行。
我一般不建议使用CyclicBarrier,因为CountDownLatch从效率和灵活度而言优于CyclicBarrier。 例如有这么一个需求,在单线程等待N个任务完成,CyclicBarrier是无法做到的,而CountDownLatch在等待的线程调用await,处理的线程调用N次countDown就OK。