JUCL之梳理(二)
简介
继续JUCL之梳理(一),重点讲解ReentranLock和ReentrantReadWriteLock的原理。
ReentranLock
ReentranLock是可重入的独占锁,实现了Lock接口,即可以使用await和signal等方法,内部类NonfairSync/FairSync继承AQS。本质是实现Lock接口的方法,方法内部调用AQS的API。
这里涉及到公平锁/非公平锁的概念。公平锁类似先进先出,反之则是非公平锁。 根据JUCL的梳理(一)知道AQS是一个双向链表,出现非公平锁的原因只有在释放锁的时刻有新的锁获取了,导致双向链表等待的节点无法获取锁而继续park。
ReentranLock关键代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
public class ReentrantLock implements Lock, java.io.Serializable {
// 默认构造函数是非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
// ReentrantLock.lock的入口
abstract void lock();
final boolean nonfairTryAcquire(int acquires) {
// 当前线程作为独占标识
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 还没获取锁,存在竞争,CAS更新AQS状态
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 已经获取锁的线程继续可以重入获取锁
else if (current == getExclusiveOwnerThread()) {
// 重入状态+acquires
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 独占锁重入过程中,不需考虑其他线程竞争,不用CAS
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
static final class NonfairSync extends Sync {
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 走AQS流程调用tryAcquire
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
// 对比NonfairSync的lock少了直接CAS改变state逻辑,强制走AQS逻辑调用tryAcquire
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 对比NonfairSync多了!hasQueuedPredecessors()判断AQS双向链表上是否没有效数据
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
}
ReentranLock特性:
- 通过AQS的state>0则代表有线程获取锁,当前线程重入锁state + 1;
- 公平锁和非公平锁差别仅仅在获取锁的步骤多加上判断AQS链表的是否有有效数据,高并发下公平锁吞吐量远远低于非公平锁
我觉得通过上一章的AQS梳理和以上注释基本可以清楚ReentrantLock的原理了。
ReentrantReadWriteLock
ReentrantReadWriteLock实现ReadWriteLock接口,ReadWriteLock接口包含了两个方法readLock和writeLock,故ReentrantReadWriteLock内部有两把锁。 ReadLock是可重入共享锁,多个读操作可以共同获取锁,简称RLock。 WriteLock是可重入独占锁,简称WLock, RLock和WLock均有公平锁和非公平锁,原理与ReentrantLock一致,默认构造函数均非公平锁。
ReentrantReadWriteLock竞争情况:
- 当RLock获取锁后,WLock会进入AQS park流程,等待WLock之前的所有RLock释放;
- 当WLock获取锁后,RLock会进入AQS park流程,等待WLock释放;
- 当WLock获取锁后,其他线程WLock会进入AQS park流程,等待WLock释放;
ReentrantReadWriteLock数据结构相关代码,请配合我的注解查看:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
private final ReentrantReadWriteLock.ReadLock readerLock;
private final ReentrantReadWriteLock.WriteLock writerLock;
final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;
// 读锁和写锁共用AQS的state,高16位为RLock共享数量,低16位为读WLock重入数量
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
static final class HoldCounter {
int count = 0;
// 把线程object转为id帮助GC回收,请查看cachedHoldCounter注释解释
// getThreadId是Unsafe.getLongVolatile操作,不用Thread.getId()是因为
// Thread的private long tid不是final或者volatile
// Thread的tid会重复利用(threadInitNumber++溢出导致重复)导致Thread.getId()获取的是旧tid
// 我只能仰望大佬们,这种极小概率发生的事件都考虑了,不像fix bug而像秀技
final long tid = getThreadId(Thread.currentThread());
}
// 缓存最后一个tryAcquireShared线程的可重入读锁,ThreadLocal.get速度没直接缓存快
// 帮助GC的原因是最后一个读锁release后线程消亡,正常此线程的资源空间需要被GC回收
// 如果HoldCounter保存Thread.currentThread()而不是tid,cachedHoldCounter引用了此线程
// 则GC无法回收此线程的资源
private transient HoldCounter cachedHoldCounter;
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
// 读可重入计数
private transient ThreadLocalHoldCounter readHolds;
// 第一个读锁
private transient Thread firstReader = null;
// 第一个读锁的重入计数,类型int,速度fast!fast!fast!
private transient int firstReaderHoldCount;
Sync() {
readHolds = new ThreadLocalHoldCounter();
// 保证readHolds成功被赋值,setState()会调用AQS的private volatile int state
// 利用内存屏障保证readHolds先于setState()
// 没有setState可能会导致返回到Sync无初始化readHolds导致程序出错
// 以上只是我的猜测,这个问题类似double check单例的volatile问题
setState(getState()); // ensures visibility of readHolds
}
}
由以上数据结构相关代码可得:
- AQS的state混合WLock和RLock的锁数量
- RLock做了大量优化,firstReaderHoldCount和cachedHoldCounter
简要分析WLock的tryAcquire、tryRelease和RLock的tryAcquireShared、tryReleaseShared。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
// 获取WLock状态
int w = exclusiveCount(c);
if (c != 0) {
// W == 0 && c != 0证明有RLock已经获取仍未释放
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
// c == 0,无任何锁
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
// WLock数量是否等于0
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// WLock获取锁未释放 && 不是当前线程则获取失败
if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
return -1;
// r = RLock获取读锁未释放数量
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 第一个RLock
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
// 通过缓存获取重入数量
HoldCounter rh = cachedHoldCounter;
// 如果缓存不存在或者缓存内的线程不是当前线程则更新缓存和获取最新重入数量
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// RLock需要阻塞或者CAS失败,进入此函数死循环直到成功或失败为止
return fullTryAcquireShared(current);
}
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// firstReader是Thread类型,重入数量为1释放后就是解锁
// 若释放后的线程消亡firstReader = null可以帮助GC回收消亡的线程
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// RLock与RLock之间无影响,return false无所谓
// 最后一个RLock释放完return true,此时AQS双链表有WLock就可以unpark
return nextc == 0;
}
}
细心的读者可能发现40行的代码比较有趣。如果一个线程WLock获取成功,可以继续获取RLock,WLock权限更高,此时的RLock业务层面上算是重入。 先获取RLock,继续获取WLock则是死锁。