JUCL之梳理(一)



简介

JUCL是平时编程“import java.util.concurrent.locks”,是java的私生子。 java长子相关的synchronized关键字与notify和wait方法因有比较大的限制,如

  1. synchronized前期效率低下;
  2. wait没有超时时间;
  3. notify不能指定唤醒对象,也无法做到公平性;
  4. wait必须比notify先执行,否则wait的线程永久阻塞,需要调用notifyAll;
  5. 编程繁琐,耦合严重,需要依赖object实例锁定资源,而JUCL是可以面向线程。

有以上痛点所以才有JUCL的一亩三分地。

互联网有很多JUCL相关包的分析,我不想过多分析源代码(但还是有)、无谓的流程和异常处理,重点看数据结构和思考设计意图。 我能力水平有限,无法透彻讲解synchronized,故不班门弄斧。

低层建筑CAS和LockSupport

CAS:Compare and Swap

原子操作,简而言之就是符合预期的话就交换,成功返回true,否则false。 查看AbstractQueuedSynchronizer.java的CAS相关的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private transient volatile Node head;

private transient volatile Node tail;

private volatile int state;

static final class Node {
    volatile int waitStatus;

    volatile Node prev;

    volatile Node next;
}

private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;

static {
    try {
        stateOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
        headOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
        tailOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
        waitStatusOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("waitStatus"));
        nextOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("next"));

    } catch (Exception ex) { throw new Error(ex); }
}

volatile关键字就两个意图:

  1. 保证可见性。告诉JVM禁止从线程工作内存读写,老老实实在主存读写。不清楚可自行搜索Java内存模型。
  2. 保证有序性。添加内存屏障告诉编译器别自作聪明瞎优化导致代码不符合程序员编写顺序。

这里是为了保证可见性,配合Unsafe,Unsafe的方法大部分都带有native,是JNI,会调用C/C++。

有C语言基础看以上代码很容易看出unsafe.objectFieldOffset就是根据类获取对应字段的偏移量,而后compareAndSwapxxx获取实例中对应字段的真正内存位置,通过指针操作内存交换数据。

根据CAS定义会先判断是否符合预期交换(更新),伪代码就是getAndSwap,需要保证原子性,在java层面上需要加锁,而C/C++可以从底层硬件做到原子性,为了性能调用C/C++。

CAS会有ABA问题,可以通过AtomicStampedReference.java解决,原理是添加stamp变量。

LockSupport

LockSupport.park和LockSupport.unpark依然是JNI调用C/C++,需要依赖操作系统线程休眠和唤醒。 原理是利用线程变量初始permit,初始为0。

  1. park会判断是否大于1,若大于则设为0并返回,否则等待唤醒(包含其他线程中断此线程)或超时自起,随后设置permit为0;
  2. unpark直接设置permit为1,并唤醒线程;

故unpark可以先于park执行,park会立刻返回不会阻塞:)

CAS和LockSupport底层结合上层应用可以解决java锁与并发的痛点。

AbstractQueuedSynchronizer

低层建筑已经有了,需要依赖上层进而封装,最重要就是AbstractQueuedSynchronizer,业界简称AQS。

JUC包内的大部分实现都依赖于AQS,而AQS的核心就是CAS和LockSupport。

介绍AQS之前,我想让大家思考为什么wait和notify要在synchronized代码块里面?如果没有synchronized会发生什么事情? 以下面代码为example:

1
2
while (!condition)
    wait();

此代码就是等待condition为true则往下执行。没有synchronized会发生线程之间竞争condition(全局资源),造成:

  1. 唤醒时condition为true,随后被其他线程修改成false,造成假唤醒,继续等待;
  2. 准备notify的线程A(这里假设也是没有synchronized代码块)要通知waiting的B线程,若此时C线程准备执行while (!condition),则线程B和C会竞争condition。

可见synchronized的作用就是锁,把condition转化为同步队列,让线程按序获取。

JUCL的Lock都包含了一个内部类,而内部类继承AQS。 Lock主要分为两种:

  1. 独占锁,即某线程拿了锁,其他线程就无法拿到锁需阻塞等待。如常用的ReentrantLock,ReentrantReadWriteLock.WriteLock。
  2. 共享锁,即能持有共享锁的线程均能获得锁,直到达到非共享状态条件则获取共享锁失败需要等待。如常用的Semaphore,CountDownLatch和ReentrantReadWriteLock.ReadLock。

AQS有两个内部类,一个是Node双向链表节点,另外一个ConditionObject实现了Condition接口。 Condition包含了await()和signal(),取代wait()和notify(),另外还有其余的超时await。

在await()上的注释有一小段如下:

The current thread is assumed to hold the lock associated with this when this method is called

结合wait需要synchronized的例子,相信读者也明白这英文的意图是什么了。

AbstractQueuedSynchronizer关键代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    // head节点prev和thread均为null,是无效节点
    private transient volatile Node head;

    // tail节点next为null,thread不为null,是有效节点
    private transient volatile Node tail;

    // 状态,子类自定义。如ReentrantLock是重载锁的个数,ReentrantReadWriteLock则读和写通过mask共用state。
    private volatile int state;

    // 独占锁获取锁API
    public final void acquire(int arg);

    // 独占锁释放锁API
    public final boolean release(int arg);

    // 共享锁获取锁API
    public final void acquireShared(int arg);

    // 共享锁释放锁API
    public final boolean releaseShared(int arg);

    // 模板模式,子类实现
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

    // 模板模式,子类实现
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

    static final class Node {

        static final Node SHARED = new Node();

        static final Node EXCLUSIVE = null;

        // 节点状态被取消
        static final int CANCELLED =  1;
        // 后继节点状态需要被唤醒
        static final int SIGNAL    = -1;
        // 节点状态在ConditionObject的队列
        static final int CONDITION = -2;
        // 节点状态在共享锁传播状态,老版本没有这个状态,是为了解决releaseShared并发问题而引入
        static final int PROPAGATE = -3;
        // 节点状态
        volatile int waitStatus;
        // AQS上一节点,ConditionObject无作用null
        volatile Node prev;
        // AQS下一节点,ConditionObject无作用null
        volatile Node next;
        // 每个节点对应一个线程
        volatile Thread thread;
        // AQS用来区分独占锁和共享锁,ConditionObject是单向链表的下一个node
        Node nextWaiter;
    }

    public class ConditionObject implements Condition, java.io.Serializable {
        // 单向链表head
        private transient Node firstWaiter;
        // 单向链表tail
        private transient Node lastWaiter;

        public final void await();

        public final void signal();
    }
}

重点分析共享锁acquireShared、releaseShared相关代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
// AQS获取共享锁入口
public final void acquireShared(int arg) {
    // 子类实现tryAcquireShared获取锁的逻辑,失败则进入AQS双向链表
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
// AQS释放共享锁入口
public final boolean releaseShared(int arg) {
    // 子类实现tryReleaseShared释放锁的逻辑,成功则unpark AQS双向链表
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

private void doAcquireShared(int arg) {
    // 新增共享状态节点
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // 前驱节点是否是head
            if (p == head) {
                // 尝试获取锁
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 断开旧head,把当前节点变成head并传播unpark
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            // 节点park前置条件是前驱节点waitStatus是SIGNAL
            // 前驱节点waitStatus是0则变为SIGNAL,下一个循环准备park
            // 前驱节点waitStatus是SIGNAL则park
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head;
        // 临界竞争点
        setHead(node);
        // propagate有可能为0,需要依赖waitStatus=PROPAGATE=-3传播unpark
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }
}

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 正常流程
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                unparkSuccessor(h);
            }
            // 并发调用doReleaseShared的异常流程
            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        if (h == head)
            break;
    }
}

看共享锁部分的源码会思考PROPAGATE的用处是什么,这问题折腾我不少时间。

以下场景用了Semaphore。Semaphore内部类继承AQS,是最简单的AQS共享锁运用,如果读者不了解请先查看相关资料。 假设有4个线程A、B、C和D有一个共享变量Semaphore = new Semaphore(0),A和B调用acquireShare,C和D调用releaseShared。

  1. 开始线程A、B调用acquireShared均park,此时AQS队列是head»A»B;
  2. 此时线程C调用releaseShared,进入doReleaseShared的正常流程unpark线程A;
  3. 此时线程A被唤醒进入setHeadAndPropagate,运行到上述代码的53行临界竞争点;
  4. 此时线程D调用releaseShared,进入doReleaseShared的异常流程,线程A中的h状态由0->PROPAGATE;
  5. 线程A propagate = 0,需要依赖PROPAGATE=-3传播unpark线程B,否则线程B一直park。

PROPAGATE就是为了解决线程在AQS共享锁park的时候,releaseShared并发导致共享锁无法传播unpark。

简要文字记录acquire、release、acquireShared、releaseShared、await和signal的正常(忽略中断和取消)流程,具体还请读者对照源码。

acquire

  1. 尝试获取锁,成功结束流程否则继续流程;
  2. 在双向链表利用CAS在链表尾部添加独占状态的节点, 若链表未初始化则先初始化再添加;
  3. 死循环流程4.5.6;
  4. 判断节点的前驱节点是否是head并且尝试获取锁是否成功,都成功则设置节点为head节点,旧head断开等待GC回收,结束流程,否则继续流程;
  5. 前驱节点的waitStatus状态为0则利用CAS设置为SIGNAL,跳转到3。前驱节点的waitStatus状态为SIGNAL则继续流程;
  6. park此节点对应的线程,等待unpark随后跳转到3。

只有在流程1无双向链表或流程4节点在链表中最靠前才能结束。

release

  1. 尝试释放锁,失败结束返回false否则继续流程;
  2. 判断双向链表是否初始化,未初始化则代表无队列等待,结束返回true,否则继续流程;
  3. 判断head的waitStatus状态为SIGNAL,且head的后继节点不为空,均为真则unpark next节点对应的线程,结束返回true。

acquireShared

  1. 尝试获取锁,成功结束流程否则继续流程;
  2. 在双向链表利用CAS在链表尾部添加共享状态的节点, 若链表未初始化则先初始化再添加;
  3. 死循环流程4.5.6
  4. 判断尝试获取锁是否成功,成功则旧head断开等待GC回收,调用setHeadAndPropagate设置节点为head节点,调用doReleaseShared unpark链表中最靠前的节点对应的线程,否则继续流程;
  5. 前驱节点的waitStatus状态为0则利用CAS设置为SIGNAL,跳转到3。前驱节点的waitStatus状态为SIGNAL则继续流程;
  6. park此节点对应的线程,等待unpark随后跳转到3。

对比acquire,acquireShared主要不同于共享锁成功获取锁后需要传播head的后继节点进行unpark其他线程。这是共享锁特性,一个获取成功了,通知其他等待的共享节点。

releaseShared

  1. 尝试释放锁,失败结束返回false否则继续流程;
  2. 调用doReleaseShared 循环判断head的waitStatus状态为SIGNAL,且head的后继节点不为空,均为真则unpark后继节点对应的线程,结束返回true,否则设置head的waitStatus状态为PROPAGATE。

await

  1. 加入ConditionObject单向链表,此时节点状态为CONDITION;
  2. release之前持有的锁;
  3. park等待signal;
  4. 被signal unpark后判断是否在AQS双向链表上,是则进入流程5;
  5. 死循环流程6.7
  6. 判断节点的前驱节点是否是head并且尝试获取锁是否成功,都成功则设置节点为head节点,旧head断开等待GC回收,结束流程,否则继续流程;
  7. park此节点对应的线程,等待unpark随后跳转到5

signal

  1. 获取ConditionObject单向链表head;
  2. 断开单向链表head节点;
  3. 通过CAS改变旧节点的状态由CONDITION->0;
  4. 把旧节点append到AQS双向链表上变为tail节点;
  5. AQS双链表tail的前驱节点利用CAS改变waitStatus由0->SIGNAL;
  6. unpark tail节点对应的线程。

我在阅读AQS的源码有一个疑问,为什么AQS和ConditionObject要共用一个双向链表Node类,明明ConditionObject是单向链表,有很多数据不需要,这样太浪费空间。 其实看了await和signal流程就会明白ConditionObject的节点始终要append到AQS的尾部,此时只要设置ConditionObject的pre连接到AQS原tail即可。