本文共 14117 字,大约阅读时间需要 47 分钟。
接下来,本文将介绍 Mybatis 缓存的实现原理,具体分析如下几个问题
解析 Mapper 配置文件,或者解析标注有 CacheNamespace 注解 Mapper 的时候,均会调用下面的方法。
由 MapperBuilderAssistant 创建缓存 org.apache.ibatis.builder.MapperBuilderAssistant#useNewCachepublic Cache useNewCache(Class typeClass, Class evictionClass, Long flushInterval, Integer size, boolean readWrite, boolean blocking, Properties props) { Cache cache = new CacheBuilder(currentNamespace) .implementation(valueOrDefault(typeClass, PerpetualCache.class)) .addDecorator(valueOrDefault(evictionClass, LruCache.class)) .clearInterval(flushInterval) .size(size) .readWrite(readWrite) .blocking(blocking) .properties(props) .build(); configuration.addCache(cache); currentCache = cache; return cache; }
可以看到 Mybatis 使用了装饰者模式,默认使用 PerpetualCache 作为缓存,默认使用 Lru 缓存策略。
public Cache build() { setDefaultImplementations(); Cache cache = newBaseCacheInstance(implementation, id); setCacheProperties(cache); // issue #352, do not apply decorators to custom caches if (PerpetualCache.class.equals(cache.getClass())) { for (Class decorator : decorators) { cache = newCacheDecoratorInstance(decorator, cache); setCacheProperties(cache); } cache = setStandardDecorators(cache); } else if (!LoggingCache.class.isAssignableFrom(cache.getClass())) { cache = new LoggingCache(cache); } return cache; }
步骤1:设置默认实现类(PerpetualCache+LruCache)
步骤2:按照 ID 创建 Cache(ID为 Mapper 接口的全类名) 步骤3:设置缓存的参数 步骤4:自定义的缓存不应用标准的装饰器(定时刷新、只读、同步、阻塞等需要自己实现),仅包装一层日志 步骤5:设置标准的装饰器private Cache setStandardDecorators(Cache cache) { try { MetaObject metaCache = SystemMetaObject.forObject(cache); if (size != null && metaCache.hasSetter("size")) { metaCache.setValue("size", size); } if (clearInterval != null) { cache = new ScheduledCache(cache); ((ScheduledCache) cache).setClearInterval(clearInterval); } if (readWrite) { cache = new SerializedCache(cache); } cache = new LoggingCache(cache); // 在获取更新缓存时防止并发 cache = new SynchronizedCache(cache); if (blocking) { // 当在缓存中找不到时,阻塞进程 cache = new BlockingCache(cache); } return cache; } catch (Exception e) { throw new CacheException("Error building standard cache decorators. Cause: " + e, e); } }
开启了全局缓存,创建 SqlSession 时就会创建一个 CachingExecutor。
提一句:开启一个 SqlSession 可以配置默认的执行器。SIMPLE 就是普通的执行器;REUSE 执行器会重用预处理语句(PreparedStatement); BATCH 执行器不仅重用语句还会执行批量更新。
public Executor newExecutor(Transaction transaction, ExecutorType executorType) { executorType = executorType == null ? defaultExecutorType : executorType; executorType = executorType == null ? ExecutorType.SIMPLE : executorType; Executor executor; if (ExecutorType.BATCH == executorType) { executor = new BatchExecutor(this, transaction); } else if (ExecutorType.REUSE == executorType) { executor = new ReuseExecutor(this, transaction); } else { executor = new SimpleExecutor(this, transaction); } if (cacheEnabled) { executor = new CachingExecutor(executor); } executor = (Executor) interceptorChain.pluginAll(executor); return executor; }
如果自定义了 executor 插件,就对 executor 进行代理。
不管 executor 有多少层代理,最终执行查询 SQL 语句的时候,都会调用如下的方法
org.apache.ibatis.executor.CachingExecutor#query(…)@Override publicList query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException { // 获取SQL语句 BoundSql boundSql = ms.getBoundSql(parameterObject); // 根据SQL语句创建 CacheKey ,作为缓存的主键,之后按照这个主键去缓存中查找。 CacheKey key = createCacheKey(ms, parameterObject, rowBounds, boundSql); return query(ms, parameterObject, rowBounds, resultHandler, key, boundSql); }
缓存的主键是如何创建的呢,看下面的方法。
@Override public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) { if (closed) { throw new ExecutorException("Executor was closed."); } CacheKey cacheKey = new CacheKey(); cacheKey.update(ms.getId()); cacheKey.update(rowBounds.getOffset()); cacheKey.update(rowBounds.getLimit()); cacheKey.update(boundSql.getSql()); ListparameterMappings = boundSql.getParameterMappings(); TypeHandlerRegistry typeHandlerRegistry = ms.getConfiguration().getTypeHandlerRegistry(); // mimic DefaultParameterHandler logic for (ParameterMapping parameterMapping : parameterMappings) { if (parameterMapping.getMode() != ParameterMode.OUT) { Object value; String propertyName = parameterMapping.getProperty(); if (boundSql.hasAdditionalParameter(propertyName)) { value = boundSql.getAdditionalParameter(propertyName); } else if (parameterObject == null) { value = null; } else if (typeHandlerRegistry.hasTypeHandler(parameterObject.getClass())) { value = parameterObject; } else { MetaObject metaObject = configuration.newMetaObject(parameterObject); value = metaObject.getValue(propertyName); } cacheKey.update(value); } } if (configuration.getEnvironment() != null) { // issue #176 cacheKey.update(configuration.getEnvironment().getId()); } return cacheKey; }
缓存的 KEY 的创建很简单,以如下信息拼接而成:SQL 方法名称+分页信息+具体的SQL+具体的输入参数+执行环境
打印 CacheKey.toString(),示例输出如下(以:隔开各个元素):931220670:1409391160:org.apache.ibatis.autoconstructor.AutoConstructorMapper.getSubject:0:2147483647:SELECT * FROM subject WHERE id = ?:1:development
此方法是实现二级缓存的关键
@Override publicList query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException { Cache cache = ms.getCache(); if (cache != null) { flushCacheIfRequired(ms); if (ms.isUseCache() && resultHandler == null) { ensureNoOutParams(ms, boundSql); @SuppressWarnings("unchecked") List list = (List ) tcm.getObject(cache, key); if (list == null) { list = delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql); tcm.putObject(cache, key, list); // issue #578 and #116 } return list; } } return delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql); }
步骤一:通过 MappedStatement 取二级缓存,由此可知二缓存默认存储在 MappedStatement 中。如果不存在,调用 delegate.query() 从一级缓存中取。
步骤二:判断是否需要刷新缓存,是的话需要清空缓存。可能有其他连接执行了修改或者删除,亦或标记了刷新缓存注解 @Options(flushCache = FlushCachePolicy.TRUE) 。 步骤三:判断是否是 CallableStatement ,不支持使用 OUT 参数缓存存储过程。 步骤四:从事务缓存管理中取缓存,此处之后具体分析。 步骤五:取到缓存直接返回,不执行 SQL,不存在调用 delegate.query() ,并缓存结果到缓存。上面有一个很关键的方法 tcm.getObject(cache, key) ,两个参数,一个是缓存 cache,一个是 key,通过事务缓存管理以 key 从 cache 中读取数据。
public class TransactionalCacheManager { private final MaptransactionalCaches = new HashMap<>(); public Object getObject(Cache cache, CacheKey key) { return getTransactionalCache(cache).getObject(key); } private TransactionalCache getTransactionalCache(Cache cache) { return transactionalCaches.computeIfAbsent(cache, TransactionalCache::new); } ......}
通过 TransactionalCache 获取 key。
org.apache.ibatis.cache.decorators.TransactionalCache#getObject@Override public Object getObject(Object key) { // issue #116 Object object = delegate.getObject(key); if (object == null) { entriesMissedInCache.add(key); } // issue #146 if (clearOnCommit) { return null; } else { return object; } }
每一个二级缓存,均有一个 TransactionalCache(二级缓存的事务buffer) 对应。然后依次调用代理的缓存实现,直到最底层的缓存实现类 PerpetualCache。
如下图的调用栈: 因之前有连接查询过,这里会有缓存结果。如果二级缓存获取不到数据,再进入 delegate.query 方法,调用 BaseExecutor 获取一级缓存
public abstract class BaseExecutor implements Executor { protected PerpetualCache localCache = new PerpetualCache("LocalCache"); ......}
@Override publicList query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException { ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId()); if (closed) { throw new ExecutorException("Executor was closed."); } if (queryStack == 0 && ms.isFlushCacheRequired()) { clearLocalCache(); } List list; try { queryStack++; list = resultHandler == null ? (List ) localCache.getObject(key) : null; if (list != null) { handleLocallyCachedOutputParameters(ms, key, parameter, boundSql); } else { list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql); } } finally { queryStack--; } if (queryStack == 0) { for (DeferredLoad deferredLoad : deferredLoads) { deferredLoad.load(); } // issue #601 deferredLoads.clear(); if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) { // issue #482 clearLocalCache(); } } return list; } private List queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException { List list; localCache.putObject(key, EXECUTION_PLACEHOLDER); try { list = doQuery(ms, parameter, rowBounds, resultHandler, boundSql); } finally { localCache.removeObject(key); } localCache.putObject(key, list); if (ms.getStatementType() == StatementType.CALLABLE) { localOutputParameterCache.putObject(key, parameter); } return list; }
实现逻辑,大家自己看吧。
不管是修改,还是删除,在调用 Executor 前都会先清空二级缓存(除非方法上标记不刷缓存)
org.apache.ibatis.executor.CachingExecutor#update@Override public int update(MappedStatement ms, Object parameterObject) throws SQLException { flushCacheIfRequired(ms); return delegate.update(ms, parameterObject); } private void flushCacheIfRequired(MappedStatement ms) { Cache cache = ms.getCache(); if (cache != null && ms.isFlushCacheRequired()) { tcm.clear(cache); } }
最终调用 TransactionalCache,标记事务结束的时候,需要清空真正的缓存,这里只是清空待提交的查询结果缓存。
@Override public void clear() { clearOnCommit = true; entriesToAddOnCommit.clear(); }
调用 Executor 时,执行SQL前,再清空一级缓存
org.apache.ibatis.executor.BaseExecutor#update@Override public int update(MappedStatement ms, Object parameter) throws SQLException { ErrorContext.instance().resource(ms.getResource()).activity("executing an update").object(ms.getId()); if (closed) { throw new ExecutorException("Executor was closed."); } clearLocalCache(); return doUpdate(ms, parameter); }
事务结束后,判断是 commit 还是 rollback。
如果是 commit ,先判断是否要清空待提交的查询结果缓存,再将待提交的查询结果缓存写入到真正的缓存 PerpetualCache 中。public void commit() { if (clearOnCommit) { delegate.clear(); } flushPendingEntries(); reset(); } private void flushPendingEntries() { for (Map.Entry
如果是 rollback ,将这个事务管理的多个 cache 全部清空。
public void rollback() { for (TransactionalCache txCache : transactionalCaches.values()) { txCache.rollback(); } } public void rollback() { unlockMissedEntries(); reset(); } private void unlockMissedEntries() { for (Object entry : entriesMissedInCache) { try { // 清空缓存,失效 key,其他线程可重新从数据库查询 delegate.removeObject(entry); } catch (Exception e) { ... } } } private void reset() { clearOnCommit = false; entriesToAddOnCommit.clear(); entriesMissedInCache.clear(); }
如果一个事务修改了表,之后又查询了表,事务正常commit之后,缓存还存在吗?
存在。修改会清空 entriesToAddOnCommit,然后标记需要清空真正的缓存;
接下来,查询会缓存结果到 entriesToAddOnCommit; 事务 commit 时会清空 delegate,之后又会将 entriesToAddOnCommit 写入到 delegate,最后清空 entriesToAddOnCommit,还原状态 clearOnCommit 。大家可能看到了 TransactionalCache 中还有一个属性 entriesMissedInCache,它是干嘛的呢?
它是为了支持 BlockingCache 用的,如果一个线程1调用查询获取不到数据,BlockingCache 会阻塞其他线程,直到线程1获取到结果放入到缓存,这样避免了当获取不到数据时其他线程全部从数据库读数据。
线程1获取到数据后,会释放 key 的重入锁,如果获取不到数据呢,其他线程依旧阻塞吗?不是的,如果当前线程1的事务结束了,是一定要释放 key 的重入锁的,这时需要唤醒其他线程。 看 org.apache.ibatis.cache.decorators.TransactionalCache#getObject 方法@Override public Object getObject(Object key) { // issue #116 Object object = delegate.getObject(key); if (object == null) { entriesMissedInCache.add(key); } // issue #146 if (clearOnCommit) { return null; } else { return object; } }
获取不到记录,记录此查询的 key。方便事务提交的时候,释放此 key 的锁。
事务结束,执行 flushPendingEntries 或者 unlockMissedEntries 释放锁。如果线程1正常查询,无修改操作,其他线程也无修改,但获取不到值,会执行 flushPendingEntries 方法中的 delegate.putObject(entry, null) 吗?这样唤醒其他线程,其他线程还是会回到数据库执行查询,然后调用 delegate.putObject(entry, null) 吗?这样岂不是 BlockingCache 严重影响了查询效率?
不会,查询返回值不会为空,一定会返回一个 List 对象,保证缓存一个值。delegate.putObject(entry, null) 只是为了保证唤醒线程。
最后回答开头提出的几个问题。
这个简单,通过查询语句,按照固定的格式拼接 key,拿这个 key 从 MappedStatement 获取缓存,获取得到即存在缓存。
定义一个事务管理的缓存Buffer TransactionalCache,维护一个 Map<Object, Object> entriesToAddOnCommit 对象 和一个 clearOnCommit 清空缓存标识;
entriesToAddOnCommit 中 key 为拼接的查询语句的那个 key,value 为事务结束前 key 的查询结果; 根据事务的结束状态,判断事务的查询缓存是否需要提交,以及已存在的二级缓存是否需要失效。转载地址:http://vmrai.baihongyu.com/