对象池概述
对象池是通过一定的规则来维护对象集合的容器。commos-pool在很多场景中,用来实现"连接池"/"任务worker池"等,大家常用的dbcp数据库连接池,也是基于commons-pool实现。
一个非常常见的实现就是RedisPool的实现:
commons-pool实现思想非常简单,它主要的作用就是将"对象集合"池化,任何通过pool进行对象存取的操作,都会严格按照"pool配置"(比如池的大小)实时的创建对象/阻塞控制/销毁对象等。它在一定程度上,实现了对象集合的管理以及对象的分发。
对象池具有如下优势:
将创建对象的方式,使用工厂模式;
通过"pool配置"来约束对象存取的时机
将对象列表保存在队列中(LinkedList)
对象池获取和归还对象逻辑如下:
对象池源码分析
GenericObjectPoolConfig
对象池Config具有如下配置项:
maxTotal: 最大值总数,当已借出数量大于这个总数,激活removeAbandoned方法清理超期不还的对象
maxActive: 链接池中最大连接数,默认为8.
maxIdle: 链接池中最大空闲的连接数,默认为8.
minIdle: 连接池中最少空闲的连接数,默认为0
maxWait: 当连接池资源耗尽时,调用者最大阻塞的时间,超时将跑出异常。单位,毫秒数;默认为-1.表示永不超时.
minEvictableIdleTimeMillis: 连接空闲的最小时间,达到此值后空闲连接将可能会被移除。负值(-1)表示不移除。
softMinEvictableIdleTimeMillis: 连接空闲的最小时间,达到此值后空闲链接将会被移除,且保留“minIdle”个空闲连接数。默认为-1.
numTestsPerEvictionRun: 对于“空闲链接”检测线程而言,每次检测的链接资源的个数。默认为3.
testOnBorrow: 向调用者输出“链接”资源时,是否检测是有有效,如果无效则从连接池中移除,并尝试获取继续获取。默认为false。建议保持默认值.
testOnReturn: 向连接池“归还”链接时,是否检测“链接”对象的有效性。默认为false。建议保持默认值.
testWhileIdle: 向调用者输出“链接”对象时,是否检测它的空闲超时;默认为false。如果“链接”空闲超时,将会被移除。建议保持默认值.
timeBetweenEvictionRunsMillis: “空闲链接”检测线程,检测的周期,毫秒数。如果为负值,表示不运行“检测线程”。默认为-1.
whenExhaustedAction: 当“连接池”中active数量达到阀值时,即“链接”资源耗尽时,连接池需要采取的手段, 默认为1:
-> 0 : 抛出异常,
-> 1 : 阻塞,直到有可用链接资源
-> 2 : 强制创建新的链接资源
blockWhenExhausted:这个参数控制在对象池没有对象可以borrow的时候,是否阻塞。搭配maxWaitDuration一起使用。
maxWaitDuration:阻塞最长等待时间。搭配blockWhenExhausted一起使用,当borrow时阻塞,判断maxWaitDuration的正负,如果是负值,borrow的时候就调用LinkedBlockingDeque的takeFirst方法阻塞(一直阻塞,等别人唤醒),如果是非负,就调用pollFirst阻塞(周期性自己唤醒自己)
GenericObjectPool
GenericObjectPool中的核心成员变量包括:
// 是一个阻塞双端队列,用于存储池子中的空闲对象
private final LinkedBlockingDeque<PooledObject<T>> idleObjects;
// 从BaseGenericObjectPool继承过来的,标志插入对象时是从头插还是从尾插,默认为true,从头插
private volatile boolean lifo = BaseObjectPoolConfig.DEFAULT_LIFO;
// 数量计数器,可以代替直接执行对象列表操作增减
private final AtomicLong createCount = new AtomicLong();
// 对象工厂,提供create、激活、验证、destroy等能力,在对象池各种操作中回调,例如JedisFactory
private final PooledObjectFactory<T> factory;
// 所有对象map,是一个concurrentHashMap,允许多个线程从这个map中获取对象,从而实现池化
private final Map<IdentityWrapper<T>, PooledObject<T>> allObjects = new ConcurrentHashMap<>();
构造方法
public GenericObjectPool(final PooledObjectFactory<T> factory)
传入工厂类则以默认配置构造GenericObjectPool,还可以增加GenericObjectConfig参数补充对象池配置,方法如下:
public GenericObjectPool(final PooledObjectFactory<T> factory,
final GenericObjectPoolConfig<T> config) {
super(config, ONAME_BASE, config.getJmxNamePrefix());
if (factory == null) {
jmxUnregister(); // tidy up
throw new IllegalArgumentException("Factory may not be null");
}
this.factory = factory;
idleObjects = new LinkedBlockingDeque<>(config.getFairness());
setConfig(config);
}
此构造函数实例化了一个LinkedList作为"对象池"容器,用来存取"对象"
继续跟进setConfig方法
setConfig
public void setConfig(final GenericObjectPoolConfig<T> conf) {
super.setConfig(conf);
setMaxIdle(conf.getMaxIdle());
setMinIdle(conf.getMinIdle());
setMaxTotal(conf.getMaxTotal());
}
在GenericObjectPool中设置了最大空闲、最小空闲、最大上限三个值,继续跟进父类看看设置了什么
// BaseGenericObjectPool#setConfig
protected void setConfig(final BaseObjectPoolConfig<T> config) {
setLifo(config.getLifo());
setMaxWait(config.getMaxWaitDuration());
setBlockWhenExhausted(config.getBlockWhenExhausted());
setTestOnCreate(config.getTestOnCreate());
setTestOnBorrow(config.getTestOnBorrow());
setTestOnReturn(config.getTestOnReturn());
setTestWhileIdle(config.getTestWhileIdle());
setNumTestsPerEvictionRun(config.getNumTestsPerEvictionRun());
setMinEvictableIdleDuration(config.getMinEvictableIdleDuration());
setDurationBetweenEvictionRuns(config.getDurationBetweenEvictionRuns());
setSoftMinEvictableIdleDuration(config.getSoftMinEvictableIdleDuration());
final EvictionPolicy<T> policy = config.getEvictionPolicy();
if (policy == null) {
// Use the class name (pre-2.6.0 compatible)
setEvictionPolicyClassName(config.getEvictionPolicyClassName());
} else {
// Otherwise, use the class (2.6.0 feature)
setEvictionPolicy(policy);
}
setEvictorShutdownTimeout(config.getEvictorShutdownTimeoutDuration());
}
这里大部分都是在set,但是有一个点隐藏在里面:setDurationBetweenEvictionRuns(config.getDurationBetweenEvictionRuns());
进入对应方法:
public final void setDurationBetweenEvictionRuns(final Duration timeBetweenEvictionRuns) {
this.durationBetweenEvictionRuns = PoolImplUtils.nonNull(timeBetweenEvictionRuns, BaseObjectPoolConfig.DEFAULT_DURATION_BETWEEN_EVICTION_RUNS);
startEvictor(this.durationBetweenEvictionRuns);
}
发现除了设置成员属性,还启动了一个驱逐器,跟进startEvictor方法
startEvictor
final void startEvictor(final Duration delay) {
synchronized (evictionLock) {
final boolean isPositiverDelay = PoolImplUtils.isPositive(delay);
if (evictor == null) { // Starting evictor for the first time or after a cancel
if (isPositiverDelay) { // Starting new evictor
evictor = new Evictor();
EvictionTimer.schedule(evictor, delay, delay);
}
} ……
}
核心在Evictor,并且通过一个ScheduledThreadPoolExecutor执行任务,可见Evictor应该是一个Runnable实现类,可以继续跟进下
Evictor#run
Evictor是继承自父类BaseGenericObjectPool的内部类,是Runnable的实现类
class Evictor implements Runnable
因此看run方法做了什么
public void run() {
final ClassLoader savedClassLoader = Thread.currentThread().getContextClassLoader();
try {
……
// Evict from the pool
try {
evict();
} ……
// Re-create idle instances.
try {
ensureMinIdle();
} ……
} finally {
// Restore the previous CCL
Thread.currentThread().setContextClassLoader(savedClassLoader);
}
}
核心是这两行:
evict
final EvictionConfig evictionConfig = new EvictionConfig(
getMinEvictableIdleDuration(),
getSoftMinEvictableIdleDuration(),
getMinIdle());
构造驱逐配置,这里的三个配置分别是对象池config中的minEvictableIdleDuration、softMinEvictableIdleDuration、minIdle和maxIdle中的最小值
final boolean testWhileIdle = getTestWhileIdle();
这里又取了testWhileIdle属性
for (int i = 0, m = getNumTests(); i < m; i++) {
if (evictionIterator == null || !evictionIterator.hasNext()) {
evictionIterator = new EvictionIterator(idleObjects);
}
……
underTest = evictionIterator.next();
这里做了一个循环,循环基于空闲对象列表的数量。
基于空闲对象队列构造了一个迭代器,迭代器为空的场景不谈,从迭代器中取出下一个需要校验的对象underTest
if (!underTest.startEvictionTest()) {
i--;
continue;
}
这里调用PoolObject对象的startEvictionTest方法,如果不需要驱逐,直接continue,不再对该对象做操作
evict = evictionPolicy.evict(evictionConfig, underTest, idleObjects.size());
这里做了一个判断,看下里面具体是怎么判断的
// -- DefaultEvictionPolicy#evict --
return (config.getIdleSoftEvictDuration().compareTo(underTest.getIdleDuration()) < 0 &&
config.getMinIdle() < idleCount) ||
config.getIdleEvictDuration().compareTo(underTest.getIdleDuration()) < 0;
对于可驱逐的对象,||语句的左半部分判断的是:
第一个条件判断idleSoftEvictDuration和idleDuration的大小,即配置项配置的允许空闲时间和池对象实际的空闲时间大小,当不进行配置,idleSoftEvictDuration默认值为-1,这个判断恒成立
第二个条件判断设置的minIdle属性和idleCount,即设置的允许空闲的最小数量,和当前空闲队列对象的量,当设置属性比当前队列数量小,返回true
||语句的右半部分判断的是当前对象的空闲时间和允许空闲的时间:
idleEvictDuration取的是配置中的允许空闲时间,与当前对象实际的空闲实际做对比
根据上面的判断可以分析出:要么队列大小已经超过当前设置的空闲数量,要么单个对象的空闲时间已经超过了当前设置的允许空闲时间,都会对对象产生驱逐
if (evict) {
destroy(underTest, DestroyMode.NORMAL);
这里判断上一步得到的是否驱逐的结论,如果驱逐,执行destroy方法进行驱逐,否则验证刚刚testWhileIdle属性如果是true,表示这个空闲对象需要做激活和校验
if (testWhileIdle) {
……
factory.activateObject(underTest);
active = true;
} catch (final Exception e) {
destroy(underTest, DestroyMode.NORMAL);
destroyedByEvictorCount.incrementAndGet();
默认的factory.activateObject(underTest);
是没有任何操作的,而jedis实现的连接池中是通过jedis.select(db)
的操作确认返回码,还是报错,确认当前连接是否还可用
// JedisFactory#activateObject
public void activateObject(PooledObject<Jedis> pooledJedis) throws Exception {
final BinaryJedis jedis = pooledJedis.getObject();
if (jedis.getDB() != clientConfig.getDatabase()) {
jedis.select(clientConfig.getDatabase());
}
}
如果在激活同时判断连接的时候报错了,这里就判断为需要销毁的继续执行驱逐方法,如果激活没问题,再进行校验,这里是第二次判断是否驱逐了,是根据对象实际的业务状态进行判断
if (active) {
……
validate = factory.validateObject(underTest);
默认的factory.validateObject(underTest)
也是恒返回true的,而在jedis实现的连接池中,是通过ping操作去实现的
// JedisFactory#validateObject
public boolean validateObject(PooledObject<Jedis> pooledJedis) {
final BinaryJedis jedis = pooledJedis.getObject();
try {
String host = jedisSocketFactory.getHost();
int port = jedisSocketFactory.getPort();
String connectionHost = jedis.getClient().getHost();
int connectionPort = jedis.getClient().getPort();
return host.equals(connectionHost)
&& port == connectionPort && jedis.isConnected()
&& jedis.ping().equals("PONG");
} catch (final Exception e) {
logger.error("Error while validating pooled Jedis object.", e);
return false;
}
}
if (!validate) {
destroy(underTest, DestroyMode.NORMAL);
destroyedByEvictorCount.incrementAndGet();
} else {
……
factory.passivateObject(underTest);
如果激活校验失败了,这里依然是要做驱逐操作的,如果校验成功了,再取消对象的激活状态,这里默认实现factory.passivateObject(underTest);
也是不做任何操作,同样jedis也没做任何操作。
ensureIdle - 确认空闲数量并创建
while (idleObjects.size() < idleCount) {
final PooledObject<T> p = create();
条件是空闲对象列表元素数量小于预期的idleCount,如果是驱逐同时新建的流程,这里是通过getMinIdle方法获取到的设置的最小空闲数量minIdle属性。
符合条件调用create方法创建池中对象
if (getLifo()) {
idleObjects.addFirst(p);
} else {
idleObjects.addLast(p);
}
这里就比较简单了,是判断用头插还是尾插
destory - 销毁对象
idleObjects.remove(toDestroy);
allObjects.remove(new IdentityWrapper<>(toDestroy.getObject()));
主要操作idleObjects和allObjects两个列表/map
执行factory的回调函数
factory.destroyObject(toDestroy, destroyMode);
同时对计数器做操作
create - 构造对象
int localMaxTotal = getMaxTotal();
首先取配置的maxTotal属性,这个属性绝对了池子中连接数量上限
final long newCreateCount = createCount.incrementAndGet();
if (newCreateCount > localMaxTotal) {
createCount.decrementAndGet();
这里先利用计数器执行数量增加,如果增加完了发现数量超了,再执行数量下调,先利用计数器操作的好处在于避免操作对象列表
判断可以创建后,执行创建和测试操作
p = factory.makeObject();
if (getTestOnCreate() && !factory.validateObject(p)) {
createCount.decrementAndGet();
return null;
}
如果测试不通,获取到任何异常,执行计数器下调操作
createCount.decrementAndGet();
创建完成一切正常的话,把新对象加入到allObject中
allObjects.put(new IdentityWrapper<>(p.getObject()), p);
borrowObject - 从对象池中取出对象
对象池提供给调用方使用的核心方法
执行borrow前有一个触发最大阈值的丢弃逻辑
if (ac != null && ac.getRemoveAbandonedOnBorrow() && (getNumIdle() < 2) &&
(getNumActive() > getMaxTotal() - 3)) {
removeAbandoned(ac);
}
可以看到校验点:
开启AbandonedOnBorrow配置
空闲数量<2
活跃数量>最大活跃数量-3,这里使用了一个核心对象池配置:maxTotal
符合条件执行removeAbandoned方法进行丢弃,然后开始执行borrow
首先检查对象池开启状态,以及检查池子中的废弃对象并丢弃。
PooledObject<T> p = null;
final boolean blockWhenExhausted = getBlockWhenExhausted();
准备工作:一个池对象p,一个标志取不到是否阻塞的属性blockWhenExhausted
p = idleObjects.pollFirst();
if (p == null) {
p = create();
if (p != null) {
create = true;
}
}
首先从空闲队列中取第一个对象,如果取不到,调用create方法创建一个新对象,如果创造出来了,直接跳到后面不判断blockWhenExhausted
如果创造不出来:
if (blockWhenExhausted) {
if (PooledObject.isNull(p)) {
p = borrowMaxWaitDuration.isNegative() ? idleObjects.takeFirst() : idleObjects.pollFirst(borrowMaxWaitDuration);
}
if (PooledObject.isNull(p)) {
throw new NoSuchElementException(appendStats(
"Timeout waiting for idle object, borrowMaxWaitDuration=" + borrowMaxWaitDuration));
}
} else if (PooledObject.isNull(p)) {
throw new NoSuchElementException(appendStats("Pool exhausted"));
}
这一段,先判断blockWhenExhausted属性,如果是true,即取不出来需要阻塞:
当borrowMaxWaitDuration属性为负,即不做周期性获取的时候,执行blockDequeue的takeFirst方法取第一个,这里是使用Condition做的条件阻塞,取不到则await等待signal唤醒,是双端队列的特性
当borrowMaxWaitDuration属性非负,即做周期性获取的时候,执行pollFirst方法加一个周期
如果阻塞方法取出来也是null,抛异常
如果blocakWhenExhausted属性为false,则上面取不出来也创建不出来,不阻塞了,直接抛异常
这里使用的idleObjects是一个双端阻塞队列:
if (!p.allocate()) {
p = null;
}
这里不管是之前取的还是create的还是阻塞取的,这里取到之后要改下状态,改不掉,让p为null
if (p != null) {
……
factory.activateObject(p);
……
……
destroy(p, DestroyMode.NORMAL);
if (p != null && getTestOnBorrow()) {
……
validate = factory.validateObject(p);
p非null时说明取到了,做激活、测试,如果出问题就销毁
updateStatsBorrow(p, Duration.ofMillis(System.currentTimeMillis() - waitTimeMillis));
更新对象池状态和记录时间
return p.getObject();
最后返回对象池中封装的内容
removeAbandoned
private void removeAbandoned(final AbandonedConfig abandonedConfig) {
// Generate a list of abandoned objects to remove
final ArrayList<PooledObject<T>> remove = createRemoveList(abandonedConfig, allObjects);
// Now remove the abandoned objects
remove.forEach(pooledObject -> {
……
try {
invalidateObject(pooledObject.getObject(), DestroyMode.ABANDONED);
} catch ……
});
}
回收那些被取走的,但是超过了很长时间没有被使用的(被遗弃的)对象。首先构造待遗弃的列表,然后遍历执行invalidateObject进行销毁
跟进createRemoveList方法看下
createRemoveList
继承自父类BaseGenericObjectPool
final Instant timeout = Instant.now().minus(abandonedConfig.getRemoveAbandonedTimeoutDuration());
首先获取一个核心属性:丢弃对象的超时时间abandonedTimeoutDuration,默认是300s,即以当前向前推300s
synchronized (pooledObject) {
if (pooledObject.getState() == PooledObjectState.ALLOCATED &&
pooledObject.getLastUsedInstant().compareTo(timeout) <= 0) {
pooledObject.markAbandoned();
remove.add(pooledObject);
}
}
根据这个if,判定该对象需要销毁的条件包括:
对象处于ALLOCATED状态,也就是使用中状态(使用中不一定就是执行中,也不一定真的还在被使用)
上次被使用的时间比过期时间还小,说明上次被使用时间更久了,早就超过300s没用了
符合这两个条件就要被加入到remove队列中
returnObject - 归还池对象
pool2提供给调用方的核心方法,归还
有借有还,归还之后调用blockDequeue中的Condition的singal方法,激活阻塞的borrow线程
final PooledObject<T> p = getPooledObject(obj);
封装成池对象
markReturningState(p);
这里是现标记成归还中的状态
final Duration activeTime = p.getActiveDuration();
这里是记录时间状态
if (getTestOnReturn() && !factory.validateObject(p)) {
……
destroy(p, DestroyMode.NORMAL);
……
ensureIdle(1, false);
……
updateStatsReturn(activeTime);
return;
}
这里校验一下要归还的对象,如果直接是个不能用的了,就不归还了,直接销毁,然后通过ensureIdle方法创建一个新的空闲对象进去,同时给他更新归还状态
try {
factory.passivateObject(p);
} catch (final Exception e1) {
……
destroy(p, DestroyMode.NORMAL);
……
ensureIdle(1, false);
……
updateStatsReturn(activeTime);
return;
}
校验完了还要取消归还对象的激活状态,在取消激活的时候如果出了异常,也是直接不归还了,重新创建一个新的对象,同时把时间和状态记录给他
if (!p.deallocate()) {
throw new IllegalStateException(……);
}
取消激活后,将归还对象的状态重置为空闲,如果不成功,抛异常
final int maxIdleSave = getMaxIdle();
if (isClosed() || maxIdleSave > -1 && maxIdleSave <= idleObjects.size()) {
……
destroy(p, DestroyMode.NORMAL);
……
ensureIdle(1, false);
……
取出配置的最大空闲对象数量,如果当前空闲队列长度已经大于配置了,直接销毁归还对象,同时执行创建方法,如果创建也进不去了,处理下异常结束。
} else {
if (getLifo()) {
idleObjects.addFirst(p);
} else {
idleObjects.addLast(p);
}
if (isClosed()) {
……
clear();
}
}
如果空闲队列长度还有空余,这里取一下头插还是尾插的配置,调用阻塞队列的头插/尾插方法,在调用的同时会让blockingDequeue中的Condition同时发一个signal信号给阻塞的borrow线程,激活borrow流程,同时处理一下如果对象池关闭的场景
updateStatsReturn(activeTime);
最后做归还后的状态记录
DefaultPoolObject - 池中对象
再看下Pool2对池中对象的封装
池中对象具备的核心属性包括:
// 标志该对象的状态,IDLE为空闲即在队列但未被使用;ALLOCATED为在使用;EVICTION为即将
// 被驱逐;VALIDATION为在队列中,可用状态;INVALID为即将被销毁
private PooledObjectState state = PooledObjectState.IDLE;
// 记录上次被归还的时间点,被构造的时候默认存入构造时间点
private volatile Instant lastReturnInstant = createInstant;
startEvictionTest - 进行可驱逐性测试
public synchronized boolean startEvictionTest() {
if (state == PooledObjectState.IDLE) {
state = PooledObjectState.EVICTION;
return true;
}
return false;
}
判断状态为IDLE空闲态,则修改为EVICTION待驱逐,同时返回true
getIdleDuration - 获取空闲周期
final Duration elapsed = Duration.between(lastReturnInstant, now());
return elapsed.isNegative() ? Duration.ZERO : elapsed;
根据代码可以判断是获取上次被归还时间点到现在之间的时间差
PooledObjectFactory - 提供对池对象的操作能力的接口
使用pool2必须要实现PooledObjectFactory,提供创建、销毁、激活、校验等能力
public interface PooledObjectFactory<T> {
void activateObject(PooledObject<T> p) throws Exception;
void destroyObject(PooledObject<T> p) throws Exception;
default void destroyObject(final PooledObject<T> p, final DestroyMode destroyMode) throws Exception {
destroyObject(p);
}
PooledObject<T> makeObject() throws Exception;
void passivateObject(PooledObject<T> p) throws Exception;
boolean validateObject(PooledObject<T> p);
}
评论区