线程池之BlockingQueue



简介

BlockingQueue是线程池的一个非常重要的参数,一般实现有3种,分别是ArrayBlockingQueue、LinkedBlockingQueue和SynchronousQueue。 本文研究他们的take和put方法源码,分析他们的异同点。

ArrayBlockingQueue

ArrayBlockingQueue是一个无界的环形数组阻塞队列,相关代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    // 数组
    final Object[] items;
    // 读索引
    int takeIndex;
    // 写索引
    int putIndex;
    // 队列数量
    int count;
    // 并发锁
    final ReentrantLock lock;
    // 读等待
    private final Condition notEmpty;
    // 写等待
    private final Condition notFull;

    // 默认非公平锁
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    // 公平锁
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // 队列空,等待
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

    private E dequeue() {
        final Object[] items = this.items;
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        // 自增 & 判断是否环形
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        // condition队列没数据signal不影响
        notFull.signal();
        return x;
    }

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // 队列满,等待
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

    private void enqueue(E x) {
        final Object[] items = this.items;
        items[putIndex] = x;
        // 自增 & 判断是否环形
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        // condition队列没数据signal不影响
        notEmpty.signal();
    }
}

数据结构层面上是一个环形数组,分别记录读写索引和队列总数,并发依赖一把ReentrantLock,支持公平锁。

LinkedBlockingQueue

LinkedBlockingQueue是一个单链表阻塞队列,相关代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
    static class Node<E> {
        E item;

        Node<E> next;

        Node(E x) { item = x; }
    }
    // 队列容量,最大Integer.MAX_VALUE
    private final int capacity;
    // 队列数量,原子性
    private final AtomicInteger count = new AtomicInteger();
    // 居然没有private,骚气!
    // head.item = null
    transient Node<E> head;
    // last.next = null
    private transient Node<E> last;
    // 读锁
    private final ReentrantLock takeLock = new ReentrantLock();
    // 读等待
    private final Condition notEmpty = takeLock.newCondition();
    // 写锁
    private final ReentrantLock putLock = new ReentrantLock();
    // 写等待
    private final Condition notFull = putLock.newCondition();

    // 默认无界
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            // 队列空,等待
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            // 原子操作,底层实现是CAS乐观锁
            c = count.getAndDecrement();
            // c为count未自减前,若自减前大于1,则队列仍可读,唤醒读等待(若无读等待signal不起作用)
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        // c为count未自减前,若自减前队列满,证明有写等待,唤醒
        if (c == capacity)
            signalNotFull();
        return x;
    }

    private E dequeue() {
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            // 队列满,等待
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            // 原子操作,底层实现是CAS乐观锁
            c = count.getAndIncrement();
            // c为count未自增前,若自增后小于容量,则队列仍可写,唤醒写等待(若无写等待signal不起作用)
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        // c为count未自增前,若自增前队列空,证明有读等待,唤醒
        if (c == 0)
            signalNotEmpty();
    }

    private void enqueue(Node<E> node) {
        last = last.next = node;
    }

    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

数据结构层面上是一个head.item=null,tail.next=null的单向链表,并发依赖两把ReentrantLock和CAS乐观锁,不支持公平锁。

LinkedBlockingQueue和ArrayBlockingQueue数据结构差异造成的公平性

不知道读者有没有想过LinkedBlockingQueue为什么没有实现公平锁?我查看源码注释并没有提及到fairness这关键字。 一开始猜测是为了吞吐量而不实现公平锁。当和ArrayBlockingQueue的数据结构和take、put算法对比的时候才恍然大悟。

分析LinkedBlockingQueue的一种情况:

  1. 数组为空,A线程take,获取takeLock,队列为空,调用await,释放takeLock并park阻塞;
  2. B线程put,获取putLock,enqueue但未运行到signalNotEmpty;
  3. 此时C线程take,获取takeLock,dequeue,释放takeLock;
  4. B线程运行signalNotEmpty,获取takeLock,调用signal把condition的队列append到AQS队列上,唤醒线程A,释放takeLock;
  5. 线程A被唤醒,发现队列仍然为空,调用await,释放takeLock并park阻塞;

条件是A线程先take,C线程后take。结果是线程C先获取数据,不符合公平性。 原因是take和put两把锁,若是同一把锁,则在上面的流程3,线程C阻塞等待不会dequeue。

不管LinkedBlockingQueue有无设置公平锁,数据结构和算法都不允许实现公平性,所以对fairness一字不提:)

SynchronousQueue

SynchronousQueue这个阻塞队列比较奇特,像LinkedBlockingQueue和ArrayBlockingQueue都是在出现空和满的情况阻塞。 SynchronousQueue很像通过队列的两次握手,take或者put任意缺少一方则阻塞等待另一方put或者take,另外一方出现则传输数据并唤醒对方线程。

源码重复比较多,但数据结构又是我重点关注的,这次先抛出结论再来分析源码。

SynchronousQueue实现队列有两种方案:

  1. 扩展的Treiber stack的LIFO的栈,实现非公平锁,解决并发依赖CAS乐观锁。
  2. 扩展的Lock-free dualqueue(除了jdk源码,我未找到Java版本)的FIFO队列,实现公平锁,解决并发依赖CAS乐观锁。

以上两种队列都是单链表,区别是LIFO和FIFO以及非公平和公平。为什么会有非公平锁的出现?会不会为了提高吞吐量? 我经过坚持不懈的测试,发现两者吞吐差别非常小(非公平高并发较快),从源码上查看也无法找出非公平锁哪里快于公平锁,能力有限,请谅解Orz。 我只贴出非公平锁stack相关的代码,queue的逻辑大同小异,不同在于数据结构和部分处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
public class SynchronousQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    static final class TransferStack<E> extends Transferer<E> {
        // 消费者状态
        static final int REQUEST    = 0;
        // 生产者状态
        static final int DATA       = 1;
        // 发生CAS竞争
        static final int FULFILLING = 2;

        // 栈数据结构
        static final class SNode {
            volatile SNode next;        // next node in stack
            volatile SNode match;       // 匹配的节点,匹配自己则代表取消
            volatile Thread waiter;     // 等待的线程
            Object item;                // null代表REQUEST,有数据就是DATA
            int mode;
        }

        E transfer(E e, boolean timed, long nanos) {

            SNode s = null; // constructed/reused as needed
            int mode = (e == null) ? REQUEST : DATA;

            for (;;) {
                SNode h = head;
                if (h == null || h.mode == mode) {  // 空或者同mode都进入准备等待
                    if (timed && nanos <= 0) {      // 一般不进入
                        if (h != null && h.isCancelled())
                            casHead(h, h.next);     // pop cancelled node
                        else
                            return null;
                    // 新建节点并设置为head节点并插入旧head节点前面成为新的head节点
                    } else if (casHead(h, s = snode(s, e, h, mode))) {
                        SNode m = awaitFulfill(s, timed, nanos);
                        if (m == s) {               // 匹配等于自己,取消模式
                            clean(s);
                            return null;
                        }
                        // 被唤醒后,无竞争条件是head为FULFILLING即s的匹配节点,next就是s节点
                        if ((h = head) != null && h.next == s)
                            casHead(h, s.next);     // pop出s和head节点,设置s.next为新head
                        // 消费者则返回匹配的节点item,生产者则返回自己的节点item
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    }
                } else if (!isFulfilling(h.mode)) { // try to fulfill
                    if (h.isCancelled())            // already cancelled
                        casHead(h, h.next);         // pop and retry
                    // 新建FULFILLING节点并插入旧head节点前面成为新的head节点
                    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                        for (;;) { // 死循环
                            // FULFILLING节点后面肯定是匹配节点
                            SNode m = s.next;
                            // 匹配节点被并发竞争处理了
                            if (m == null) {
                                casHead(s, null);   // pop fulfill node
                                s = null;           // use new node next time
                                break;              // restart main loop
                            }
                            SNode mn = m.next;
                            // CAS设置匹配节点为s节点(相互匹配),并唤醒匹配节点对应的线程
                            if (m.tryMatch(s)) {
                                casHead(s, mn);     // pop出s和m节点,设置mn为新head
                                // 消费者则返回匹配的节点item,生产者则返回自己的节点item
                                return (E) ((mode == REQUEST) ? m.item : s.item);
                            } else                  // lost match
                                s.casNext(m, mn);   // help unlink
                        }
                    }
                    // head为FULFILLING节点的时候就会进入这里,帮FULFILLING做事情而已
                    // 逻辑同上,缺少return,我觉得英文注释的help有利用CAS“等待”FULFILLING完成的意图
                } else {                            // help a fulfiller
                    SNode m = h.next;               // m is h's match
                    if (m == null)                  // waiter is gone
                        casHead(h, null);           // pop fulfilling node
                    else {
                        SNode mn = m.next;
                        if (m.tryMatch(h))          // help match
                            casHead(h, mn);         // pop both h and m
                        else                        // lost match
                            h.casNext(m, mn);       // help unlink
                    }
                }
            }
        }

        boolean tryMatch(SNode s) {
            if (match == null &&
                UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
                Thread w = waiter;
                if (w != null) {    // waiters need at most one unpark
                    waiter = null;
                    LockSupport.unpark(w);
                }
                return true;
            }
            return match == s;
        }

        SNode awaitFulfill(SNode s, boolean timed, long nanos) {
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Thread w = Thread.currentThread();
            int spins = (shouldSpin(s) ?
                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            for (;;) {
                if (w.isInterrupted())
                    s.tryCancel();
                SNode m = s.match;
                if (m != null)
                    return m;
                if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        s.tryCancel();
                        continue;
                    }
                }
                // 先自旋等待一段时间,优化一波吞吐量
                if (spins > 0)
                    spins = shouldSpin(s) ? (spins-1) : 0;
                else if (s.waiter == null)
                    s.waiter = w; // establish waiter so can park next iter
                else if (!timed)
                    LockSupport.park(this);
                else if (nanos > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanos);
            }
        }
    }

    public E take() throws InterruptedException {
        E e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, false, 0) == null) {
            Thread.interrupted();
            throw new InterruptedException();
        }
    }
}

希望读者可以自己debug一下SynchronousQueue,因过程涉及到大量的CAS,单看源代码理解起来会很晦涩。 因为是CAS乐观锁队列,并发量是比ArrayBlockingQueue和LinkedBlockingQueue高。

通过源码也知道SynchronousQueue是容量为1的队列,队列空take就阻塞。为什么没有其他容量为X,CAS的乐观锁的队列,AQS都可以扔掉了?

有两个原因:

  1. CAS是需要自旋的,而且SynchronousQueue还自己自旋优化提高并发,简直是压榨干CPU了;
  2. CAS锁只能锁原子资源,对于上下文的资源无法锁定。

此时CPU的使用率会UP UP UP,试问一个高并发的系统,自身难保还要在队列消耗这么多的CPU资源,这是不切合实际的,所以AQS锁应用才会这么广。

队列的应用场景

通过这篇文章可以知道:

  1. 在要保持队列公平的情况下,需要传输大量数据。选择ArrayBlockingQueue;
  2. 不需保持队列公平的情况下,需要传输大量数据,低并发(基本上是1生产者1消费者),可以预先分配内存。选择ArrayBlockingQueue;
  3. 不需保持队列公平的情况下,需要传输大量数据,并发不低,选择LinkedBlockingQueue;
  4. 不需保持队列公平的情况下,需要队列无界,选择LinkedBlockingQueue;
  5. 需要实时性较高的传递资源,如数据库和线程连接池,选择SynchronousQueue。

这些选择都跟自身特性有关,我不想废话太多:)

测试demo

以下是我写的测试demo,读者可以修改参数更深入理解这3个队列。例如修改LinkedBlockingQueue的容量接近个位数对比SynchronousQueue的吞吐量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public class BlockQueueTest {

    public static void main(String[] args) throws InterruptedException {


        SynchronousQueue<Integer> synchronousQueue;
        LinkedBlockingQueue<Integer> linkedBlockingQueue;
        ArrayBlockingQueue<Integer> arrayBlockingQueue;

        System.out.println("==================================================");

        // change us
        final int size = 100000;
        final int sendThreads = 50;
        final int recThreads = 50;
        final int capacity = 10;

        synchronousQueue = new SynchronousQueue<>();
        linkedBlockingQueue = new LinkedBlockingQueue<>(capacity);
        arrayBlockingQueue = new ArrayBlockingQueue<>(capacity);

        System.out.println(String.format("unfairness queue test: size %d, capacity: %d, send thread: %d, received thread: %d",
            size, capacity, sendThreads, recThreads));

        testThroughput(size, sendThreads, recThreads, synchronousQueue);
        testThroughput(size, sendThreads, recThreads, linkedBlockingQueue);
        testThroughput(size, sendThreads, recThreads, arrayBlockingQueue);

        System.out.println("==================================================");

    }

    private static void testThroughput(int size, int sendThreads, int recThreads, BlockingQueue<Integer> queue) throws InterruptedException {
        int perSize = size / sendThreads;

        ArrayList<Thread> receivedThreads = new ArrayList<>();
        CountDownLatch start = new CountDownLatch(1);
        CountDownLatch finish = new CountDownLatch(size);
        for (int i = 0; i < sendThreads; i++) {
            int k = i;
            new Thread(() -> {
                for (int j = 0; j < perSize; j++) {
                    try {
                        start.await();
                        queue.put(k * perSize + j);
                    } catch (InterruptedException e) {
                    }
                }
            }).start();
        }
        for (int i = 0; i < recThreads; i++) {
            Thread thread = new Thread(() -> {
                while (true) {
                    try {
                        queue.take();
                    } catch (InterruptedException e) {
                        break;
                    } finally {
                        finish.countDown();
                    }
                }
            });
            thread.start();
            receivedThreads.add(thread);
        }
        long startTime = System.currentTimeMillis();
        start.countDown();
        finish.await();

        long eclipsed = System.currentTimeMillis() - startTime;
        receivedThreads.forEach(Thread::interrupt);

        System.out.println(queue.getClass().getSimpleName() + " eclipsed " + eclipsed + " millisecond.");
    }
}