从Hikari看并发与死锁问题



简介

从目前测试的数据库连接池来说Hikari是最快的,故研究其源码,并举一反三看并发的问题。

核心并发包

Hikari快的核心就是ConcurrentBag并发包,先看看他的字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseable {

   // COW数组,Connection被引用在里面,COW就是为了读写分离,提高读的并发,写的时候上锁拷贝并赋值
   private final CopyOnWriteArrayList<T> sharedList;
   // 决定ThreadLocal<List> threadList的实现类
   // 若true则用jdk的ArrayList且弱引用PoolEntry(包含connecion和其他信息),false则用Hikari的FastList
   // FastList做了优化,稍微比ArrayList快,weakThreadLocals默认为false
   private final boolean weakThreadLocals;

   // 快速获取PoolEntry
   private final ThreadLocal<List<Object>> threadList;
   // 添加PoolEntry的处理器
   private final IBagStateListener listener;
   // 等待数量
   private final AtomicInteger waiters;
   private volatile boolean closed;

   // 传递PoolEntry的队列
   private final SynchronousQueue<T> handoffQueue;
}

如何获取数据库连接

略过一些无关紧要的代码直接进入HikariPool.java的getConnection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
   public Connection getConnection(final long hardTimeout) throws SQLException
   {
      suspendResumeLock.acquire();
      final long startTime = currentTime();

      try {
         long timeout = hardTimeout;
         do {
            PoolEntry poolEntry = connectionBag.borrow(timeout, MILLISECONDS);
            if (poolEntry == null) {
               break; // We timed out... break and throw exception
            }

            final long now = currentTime();
            if (poolEntry.isMarkedEvicted() || (elapsedMillis(poolEntry.lastAccessed, now) > ALIVE_BYPASS_WINDOW_MS && !isConnectionAlive(poolEntry.connection))) {
               closeConnection(poolEntry, poolEntry.isMarkedEvicted() ? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE);
               timeout = hardTimeout - elapsedMillis(startTime);
            }
            else {
               metricsTracker.recordBorrowStats(poolEntry, startTime);
               return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now);
            }
         } while (timeout > 0L);

         metricsTracker.recordBorrowTimeoutStats(startTime);
         throw createTimeoutException(startTime);
      }
      catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         throw new SQLException(poolName + " - Interrupted during connection acquisition", e);
      }
      finally {
         suspendResumeLock.release();
      }
   }

正常流程会进入第9和第21行。

进入第9行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException{
    // 从threadLocal先获取
    final List<Object> list = threadList.get();
    for (int i = list.size() - 1; i >= 0; i--) {
        final Object entry = list.remove(i);
        @SuppressWarnings("unchecked")
        final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
        // CAS判断
        if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
            return bagEntry;
        }
    }

    // threadLocal失败, 从sharedList获取,此时等待数量增加
    final int waiting = waiters.incrementAndGet();
    try {
        for (T bagEntry : sharedList) {
            // CAS判断
            if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
                // If we may have stolen another waiter's connection, request another bag add.
                if (waiting > 1) {
                    // 达到配置的最大连接就不再增加
                    listener.addBagItem(waiting - 1);
                }
                return bagEntry;
            }
        }
        // 达到配置的最大连接就不再增加
        listener.addBagItem(waiting);

        // sharedList获取失败,从handoffQueue阻塞获取
        timeout = timeUnit.toNanos(timeout);
        do {
        final long start = currentTime();
        // 传递队列
        final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
        if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
            return bagEntry;
        }

        timeout -= elapsedNanos(start);
        } while (timeout > 10_000);

        // 超时,获取失败
        return null;
    }
    finally {
        waiters.decrementAndGet();
    }
}

获取PoolEntry流程为»threadList»sharedList»handoffQueue

threadList和sharedList过程都有通过CAS改变状态,并发高的时候会直接进入handoffQueue。

如何释放数据库连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void requite(final T bagEntry) {
   // 先把这个connection设置为未使用状态
   bagEntry.setState(STATE_NOT_IN_USE);
   // 如果获取连接在threadlocal失败了,必然有waiters > 0
   for (int i = 0; waiters.get() > 0; i++) {
      // connection被其他线程从threadList和sharedList拿走则return
      // 否则调用handoffQueue.offer
      // handoffQueue有消费者的时候handoffQueue.offer会返回true,否则false
      if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
         return;
      }
      // 防止并发竞争且waiters过多CPU占用过高,每255次小休眠
      else if ((i & 0xff) == 0xff) {
         parkNanos(MICROSECONDS.toNanos(10));
      }
      // 退让使用权给其他线程,希望其他线程快点释放让connection变为未使用状态
      else {
         yield();
      }
   }
   // 没有waiter或者所有waiter因并发竞争都获取connection失败则让threadList添加这个PoolEntry
   final List<Object> threadLocalList = threadList.get();
   threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
}

可以看出释放连接考虑了大量并发竞争的问题。

结论:Hikari的快是通过ConcurrentBag的threadList»sharedList这两层“缓存”,这两个阶段无锁,sharedList用CAS解决了大量竞争,直到handoffQueue才会有锁和阻塞。

获取连接如何轻易出现死锁

这里引用死锁四必要条件

  1. 互斥条件:资源是独占的且排他使用,进程互斥使用资源,即任意时刻一个资源只能给一个进程使用,其他进程若申请一个资源,而该资源被另一进程占有时,则申请者等待直到资源被占有者释放。
  2. 不可剥夺条件:进程所获得的资源在未使用完毕之前,不被其他进程强行剥夺,而只能由获得该资源的进程资源释放。
  3. 请求和保持条件:进程每次申请它所需要的一部分资源,在申请新的资源的同时,继续占用已分配到的资源。
  4. 循环等待条件:在发生死锁时必然存在一个进程等待队列{P1,P2,…,Pn},其中P1等待P2占有的资源,P2等待P3占有的资源,…,Pn等待P1占有的资源,形成一个进程等待环路,环路中每一个进程所占有的资源同时被另一个申请,也就是前一个进程占有后一个进程所深情地资源。

用spring写一个demo,配置Hikari最大连接数是10。 核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Service
public class DataSourceDeadLockImpl {
    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private ApplicationContext applicationContext;

    @Transactional
    public void outerTransaction() {
        logger.info("Outer transaction.");
        DataSourceDeadLockImpl relativeService = applicationContext.getBean(getClass());
        relativeService.interTransaction();
    }

    // propagation = Propagation.REQUIRES_NEW,spring事务管理器会请求一个新的connection
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void innerTransaction() {
        logger.info("Inner transaction.");
    }
}

只要写一个简单的10线程并发就会出现死锁,原因很简单符合死锁四必要条件。

那应该怎么做?引用Hikari的github上FAQ

Q: What about “pool-locking” (deadlocking)? A: The prospect of “pool-locking” has been raised with respect to single actors that acquire many connections. This is largely an application-level issue. Yes, increasing the pool size can alleviate lockups in these scenarios, but we would urge you to examine first what can be done at the application level.

The calculation of pool size in order to avoid deadlock is a fairly simple resource allocation formula:

pool size = Tn x (Cm - 1) + 1

Where Tn is the maximum number of threads, and Cm is the maximum number of simultaneous connections held by a single thread.

For example, imagine three threads (Tn=3), each of which requires four connections to perform some task (Cm=4). The pool size required to ensure that deadlock is never possible is:

pool size = 3 x (4 - 1) + 1 = 10

Another example, you have a maximum of eight threads (Tn=8), each of which requires three connections to perform some task (Cm=3). The pool size required to ensure that deadlock is never possible is:

pool size = 8 x (3 - 1) + 1 = 17

我的demo这里Cm是2,Tn是10,则pool size = 11可以保证不出现死锁。 实际应用并不是固定最大并发量,需要考虑实际应用中负载均衡分配给集群上每台机器的最高并发决定的。

这里通常有两种解决方案:

  1. 对单个线程需要获取N个不同数据库连接的代码重点关注,可以尽量让他们各自创建N个数据源DataSource,如spring配置多个transactionManager,在对应业务注解可以加上
    1
    
    @Transactional(transactionManager = transactionManagerN, Propagation.REQUIRES_NEW)
    
  2. 在数据库设置connection超时时间,但应用在死锁阶段不可用。