前面做了n多准备,包括同步队列、阻塞队列、线程池、周期性线程池等等,今天终于可以开始深入研究连接池了,从HikariPool开始。
连接池存在的原因和线程池大概类似,因为数据库连接的获取是有开销的,频繁获取、关闭数据库连接会带来不必要的开销,影响系统性能。所以就有了数据库连接池。
数据库连接池技术提前创建好数据库连接,并对数据库连接进行池化管理:连接创建好之后交给连接池,应用需要连接的时候直接从连接池获取、而不需要创建连接,使用完之后归还给连接池、而不是关闭连接。从而减少频繁创建、关闭数据库连接锁带来的开销,极大提高系统性能。
今天从HikariPool开始,从源码角度学习了解数据库连接池的实现原理。HikariPool是目前公认性能最好的数据库连接池,同时也是SpringBoot2.0以后的默认连接池,相信今后会有越来越多的公司和项目使用HikariPool连接池。
关于数据库连接池的基本认知
先对数据库连接池的基本工作原理做个了解,不管是HikariPool、还是druid,所有的数据库连接池应该都是按照这个基本原理工作和实现的,带着这个思路去学习数据库连接池,避免盲人摸象。
数据库连接池一定会包含以下基本逻辑:
- 创建连接并池化:初始化的时候创建、或者是在应用获取连接的过程中创建,连接创建好之后放在连接池(内存中的容器,比如List)中保存。
- 获取数据库连接:接管了获取数据库连接的方法,从连接池中获取、而不是创建连接。
- 关闭数据库连接:接管了关闭数据库连接的方法,将连接归还到连接池、而不是真正的关闭数据库连接。
- 连接池维护:连接池容量、连接超时清理等工作。
带着这个思路研究HikariPool的源码,会有事半功倍的功效。
认识HikariPool的结构
包括以下几个部分:
- ConcurrentBag & PoolEntry
- addConnectionExecutor
- houseKeeperTask
- HikariProxyConnection
- closeConnectionExecutor
ConcurrentBag
直译就是“连接口袋”,就是放数据库连接的口袋,也就是连接池。
ConcurrentBag有3个保存数据库连接的地方(池):
- threadList:是一个ThreadLocal变量,保存当前线程持有的数据库连接。
- sharedList:是一个CopyOnWriteArrayList,线程安全的arrayList,数据库连接创建后首先进入sharedList。
- handoffQueue:是一个SynchronousQueue,数据库连接创建、进入sharedList后,也会进入handoffQueue等待连接请求。
除此之外,ConcurrentBag还有一个比较重要的属性waiters,记录向连接池获取数据库连接而不得、等待的线程数。
PoolEntry
连接池中存储的对象不是Connection,也不是Connection的代理对象,而是PoolEntry。
PoolEntry持有数据库连接对象Connection,Connection在实例化PoolEntry前创建。PoolEntry创建的过程中会同时创建两个ScheduledFuture任务:endOfLife和keepalive。
endOfLife提交MaxLifetimeTask定时任务,在PoolEntry创建后的maxLifetime(参数指定)执行。MaxLifetimeTask会关闭当前连接、同时根据需要创建新的连接加入到连接池。
keepalive提交KeepaliveTask周期性任务,在PoolEntry创建后按照keepalive(参数设定)周期性执行,在当前连接空闲的情况下检查连接是否可用(使用参数指定的ConnectionTestQuery进行测试),如果连接不可用则关闭并重新创建连接。
addConnectionExecutor
添加数据库连接的任务管理器。负责数据库连接的创建以及加入到连接池中(sharedList和handoffQueue)。
注意addConnectionExecutor本身是一个线程池ThreadPoolExecutor,线程池容量为最大数据库连接数,HikariPool初始化的过程中会以多线程(corePoolSize=CPU内核数量)的方式快速完成连接创建和池化,之后正常情况下会以单线程(corePoolSize=1)的方式创建数据库连接。
houseKeeperTask
是一个周期性线程池ScheduledExecutorService,初始化的时候创建,定时(housekeepingPeriodMs)执行,清理掉(关闭)连接池中多余的(大于最小空闲数)空闲连接,如果连接池中的连接数没有达到参数设置的数量(最大连接数、或者空闲连接没有达到最小空闲连接数)则创建连接。
HikariProxyConnection
HikariProxyConnection是数据库连接Connection的扩展类,应用层通过getConnection方法获取到的数据库连接其实不是不是数据库连接Connection、而是这个扩展类HikariProxyConnection。
使用扩展类HikariProxyConnection、而不是原生的数据库连接Connection的一个最直观的理由是,对于数据库连接池来说,连接的关闭方法close不是要关闭连接、而是要把连接交还给连接池。使用扩展类(或者代理类)就可以很容易的重写其close方法实现目标,而如果直接使用原生Connection的话,就没办法控制了。
closeConnectionExecutor
数据库连接池需要关闭的时候,通过closeConnectionExecutor线程池提交,关闭连接后closeConnectionExecutor还负责调用fillPool(),根据需要填充连接池。
好了,HikariPool的基础结构了解完了,接下来要进入源码分析了,主要包括:
- HikariPool的初始化
- 获取数据库连接 – getConnection方法
- 关闭数据库连接 – close方法
HikariPool的初始化
要开始分析源码了。
HikariPool是实例化的时候在构造方法中完成初始化。代码虽然不是很多,但是内容很多,所以还是分段、一步一步分析。
public HikariPool(final HikariConfig config)
{
super(config);
this.connectionBag = new ConcurrentBag(this);
this.suspendResumeLock = config.isAllowPoolSuspension() ? new SuspendResumeLock() : SuspendResumeLock.FAUX_LOCK;
this.houseKeepingExecutorService = initializeHouseKeepingExecutorService();
checkFailFast();
首先调用super也就是PoolBase的构造方法,主要设置配置的相关参数,初始化DataSource(initializeDataSource,获取到数据库连接的url、username、password等重要参数,创建DataSource)。
之后创建ConcurrentBag、完成ConcurrentBag的初始化(通过构造方法):创建handoffQueue、sharedList以及threadList。
接下来初始化houseKeepingExecutorService,也就是创建一个ScheduledThreadPoolExecutor,准备接收houseKeep任务。
然后调用checkFailFast(),checkFailFast()的作用是在初始化连接池之前首先做一次快速尝试:创建一个PoolEntry(通过createPoolEntry方法,稍后分析源码),如果创建成功则将其加入连接池后,返回,否则如果失败(数据库连接创建失败)则不断尝试直到耗费完设置的初始化时间initializationTimeout。
接着看初始化代码:
if (config.getMetricsTrackerFactory() != null) {
setMetricsTrackerFactory(config.getMetricsTrackerFactory());
}
else {
setMetricRegistry(config.getMetricRegistry());
}
setHealthCheckRegistry(config.getHealthCheckRegistry());
handleMBeans(this, true);
ThreadFactory threadFactory = config.getThreadFactory();
final int maxPoolSize = config.getMaximumPoolSize();
LinkedBlockingQueue addConnectionQueue = new LinkedBlockingQueue(maxPoolSize);
this.addConnectionQueueReadOnlyView = unmodifiableCollection(addConnectionQueue);
this.addConnectionExecutor = createThreadPoolExecutor(addConnectionQueue, poolName + " connection adder", threadFactory, new ThreadPoolExecutor.DiscardOldestPolicy());
this.closeConnectionExecutor = createThreadPoolExecutor(maxPoolSize, poolName + " connection closer", threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
this.leakTaskFactory = new ProxyLeakTaskFactory(config.getLeakDetectionThreshold(), houseKeepingExecutorService);
this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(), 100L, housekeepingPeriodMs, MILLISECONDS);
以上代码主要完成以下几件事情:
MetricsTrackerFactory主要用来创建数据库连接的统计分析任务。
注册健康检查任务,默认是CodahaleHealthChecker,负责连接的健康检查,今天暂不涉及。
JMX处理,暂不涉及。
最关键的部分来了:创建addConnectionExecutor,addConnectionQueueReadOnlyView,closeConnectionExecutor,leakTaskFactory以及houseKeeperTask。
这几个线程池都非常重要。
addConnectionExecutor负责创建连接、封装到PoolEntry中之后加入连接池中。houseKeeperTask负责连接池清理、连接池中的连接数量没有达到参数设置的连接数量的话,houseKeeperTask还负责创建连接。leakTaskFactory负责创建连接泄露检查任务。
HikariPool连接池初始化的大部分工作都在以上几行代码中:
首先来看houseKeeperTask,HouseKeeperTask提交HouseKeeper任务,HouseKeeper是实现了Runnable接口的HikariPool的内部类,HouseKeeper检查当前空闲连接数如果大于参数设置的最小空闲连接数的话,会把超过idleTimeout未用的连接关闭。之后调用fillPool()方法。
fillPool()方法检查当前连接池中的连接数没有达到参数设置的最大连接数、或者空闲连接数没有达到参数设置的最小空闲连接数的话,通过调用addConnectionExecutor.submit方法提交poolEntryCreator任务、创建n个连接(并加入连接池)直到连接数量满足参数设置的要求。
PoolEntryCreator是实现了callable接口的任务,提交给线程池addConnectionExecutor之后,addConnectionExecutor会调用PoolEntryCreator的call()方法,call()方法调用createPoolEntry()方法创建数据库连接、创建之后调用connectionBag.add方法将新创建的PoolEntry加入连接池。
代码看到这里,我们可以得出一个结论:houseKeeperTask会通过addConnectionExecutor提交多个(参数最大连接数、最小空闲连接数指定)创建数据库连接的任务,从而完成数据库连接池中初始化连接的创建!
接下来,我们再看一下初始化的最后一段代码:
if (Boolean.getBoolean("com.zaxxer.hikari.blockUntilFilled") && config.getInitializationFailTimeout() > 1) {
addConnectionExecutor.setMaximumPoolSize(Math.min(16, Runtime.getRuntime().availableProcessors()));
addConnectionExecutor.setCorePoolSize(Math.min(16, Runtime.getRuntime().availableProcessors()));
final long startTime = currentTime();
while (elapsedMillis(startTime)
这段代码的意思是:如果参数blockUntilFilled设置为true(连接池没有完成初始化则阻塞)、并且参数InitializationFailTimeout>1(初始化阻塞时长)的话,则设置addConnectionExecutor的最大线程数和核心线程数为16和内核数量的最大值,阻塞等待初始化时长达到参数设定值、或者连接池中创建的连接数量已经大于最小空闲数量后,重新设置addConnectionExecutor的核心线程数和最大线程数为1。
这段代码要达到的目标是:在参数设定的初始化时长范围内,将addConnectionExecutor线程池加大到最大马力(线程数设置到尽可能最大)、以最短的时间完成初始连接的创建。初始连接创建完成之后,将addConnectionExecutor的线程数恢复为1,也就是说初始化的时候调动尽可能多的线程创建连接、初始化完成之后的连接创建实际上是由一个线程完成的。
初始化的代码分析完毕,但是为了可读性,我们跳过了createPoolEntry方法的源码,创建物理连接就是在createPoolEntry方法完成的,所以createPoolEntry方法也非常重要。
现在补上!
createPoolEntry方法
createPoolEntry是HikariPool连接池中唯一创建数据库连接的地方,通过线程池addConnectionExecutor绑定的PoolEntryCreator发起调用。
private PoolEntry createPoolEntry()
{
try {
final PoolEntry poolEntry = newPoolEntry();
final long maxLifetime = config.getMaxLifetime();
if (maxLifetime > 0) {
// variance up to 2.5% of the maxlifetime
final long variance = maxLifetime > 10_000 ? ThreadLocalRandom.current().nextLong( maxLifetime / 40 ) : 0;
final long lifetime = maxLifetime - variance;
poolEntry.setFutureEol(houseKeepingExecutorService.schedule(new MaxLifetimeTask(poolEntry), lifetime, MILLISECONDS));
}
final long keepaliveTime = config.getKeepaliveTime();
if (keepaliveTime > 0) {
// variance up to 10% of the heartbeat time
final long variance = ThreadLocalRandom.current().nextLong(keepaliveTime / 10);
final long heartbeatTime = keepaliveTime - variance;
poolEntry.setKeepalive(houseKeepingExecutorService.scheduleWithFixedDelay(new KeepaliveTask(poolEntry), heartbeatTime, heartbeatTime, MILLISECONDS));
}
return poolEntry;
}
catch (ConnectionSetupException e) {
if (poolState == POOL_NORMAL) { // we check POOL_NORMAL to avoid a flood of messages if shutdown() is running concurrently
logger.error("{} - Error thrown while acquiring connection from data source", poolName, e.getCause());
lastConnectionFailure.set(e);
}
}
catch (Exception e) {
if (poolState == POOL_NORMAL) { // we check POOL_NORMAL to avoid a flood of messages if shutdown() is running concurrently
logger.debug("{} - Cannot acquire connection from data source", poolName, e);
}
}
return null;
}
首先创建PoolEntry,然后通过线程池houseKeepingExecutorService绑定endOfLife以及keepalive任务。这两个任务的作用请参考本文的PoolEntry部分的描述。
我们还是没有看到创建数据库物理连接的地方,别急,他就在方法的第一行代码:newPoolEntry()中,我们看一下newPoolEntry方法。
PoolEntry newPoolEntry() throws Exception
{
return new PoolEntry(newConnection(), this, isReadOnly, isAutoCommit);
}
创建数据库物理连接,交给PoolEntry的构造方法去创建PoolEntry,最终PoolEntry对象会持有创建的数据库连接。
PoolEntry创建好之后,会调用connectionBag.add方法将其加入的连接池中。
ConcurrentBag#add
线程池addConnectionExecutor创建PoolEntry之后,会调用ConcurrentBag的add方法将其加入到连接池中:
public void add(final T bagEntry)
{
if (closed) {
LOGGER.info("ConcurrentBag has been closed, ignoring add()");
throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
}
sharedList.add(bagEntry);
// spin until a thread takes it or none are waiting
while (waiters.get() > 0 && bagEntry.getState() == STATE_NOT_IN_USE && !handoffQueue.offer(bagEntry)) {
Thread.yield();
}
}
首先加入到sharedList中。
之后如果连接池有等待获取连接的线程、并且当前连接处于空闲状态,则提交当前PoolEntry给handoffQueue队列。
handoffQueue是手递手队列,如果当前时间点并没有排队想要从handoffQueue获取连接的线程存在的话,当前线程会挂起等待。这里所说的“当前线程”就是“创建数据库连接”的线程,是addConnectionExecutor线程池中的线程。
addConnectionExecutor中的线程在创建数据库连接poolEntry之后,有以下可能:
- 当前正好有应用需要获取数据库连接,并且通过thredList和shareList都没有获取到连接、需要到handoffQueue获取的话,则当前刚创建的数据库连接通过handoffQueue交给获取线程后,当前线程归还到addConnectionExecutor线程池中,如果有多个poolEntry在handoffQueue排队的话,当前线程yield等待
- 当前没有需要排队获取连接的线程(waiter=0),则poolEntry加入到shareList中之后,当前线程直接归还到addConnectionExecutor线程池中
小结
HikariPool的基本框架以及初始化过程、数据库连接的创建以及加入池中、连接池的houseKeep过程的源码分析完毕。剩下的就是应用层从池中获取连接、以及关闭连接的逻辑了,下一篇文章分析。
Thanks a lot!
上一篇 Java并发编程 Lock Condition & ReentrantLock(二)
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net