目 录CONTENT

文章目录

spring-transaction事务

FatFish1
2025-02-09 / 0 评论 / 0 点赞 / 57 阅读 / 0 字 / 正在检测是否收录...

spring事务配置

<!-- 补充命名空间:xmlns:tx=http://www.Springframework.org/schema/tx-->
<!-- 补充schemaLocations:http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd-->

<tx:annotation-driven transaction-manager="transactionManager" />
<bean id="transactionManager" class="org.Springframework.jdbc.datasource.DataSourceTransactionManager">
    <property name="dataSource" ref="dataSource" />
</bean>

使用<tx:annotation-driven/>标签开启事务

将TransactionManager添加到IOC中管理,并向其中注入dataSource,即可对对应的dataSource进行事务管理。

在这种情况下,当sql执行出现异常时,spring将自行抛出异常同时将执行的sql进行回滚。

默认情况下Spring中的事务处理只对RuntimeException⽅法进⾏回滚,所以,如果将Runtime Exception替换成普通的Exception不会产⽣回滚效果。

spring事务创建流程

spring事务创建流程是基于spring-AOP实现的

http://www.chymfatfish.cn/archives/spring-aop

同时因为有自定义的标签<tx:annotation-driven/> ,还会涉及到自定义标签解析流程

http://www.chymfatfish.cn/archives/springxml#%E8%87%AA%E5%AE%9A%E4%B9%89%E6%A0%87%E7%AD%BE%E8%A7%A3%E6%9E%90%E7%9B%B8%E5%85%B3%E7%B1%BB

梳理其整体逻辑为:

  1. 通过AnnotationDrivenBeanDefinitionParser加载自定义标签<tx:annotation-driven/> ,同时将BeanFactoryTransactionAttributeSourceAdvisor、AnnotationTransactionAttributeSource、TransactionInterceptor加载到上下文

  2. 通过重写的AOP生成器InfrastructureAdvisorAutoProxyCreator#postProcessAfterInitialization 方法启动加载流程,跳转到wrapIfNecessary方法,执行AOP加载两大步骤:findAdvisor、createProxy

    1. 找到BeanFactoryTransactionAttributeSourceAdvisor,并通过其中的TransactionAttributeSourcePointcut执行AOP的canApply校验,判断适配事务切面,包括@Transaction校验、public校验等

    2. 在createProxy的流程中,先把BeanFactoryTransactionAttributeSourceAdvisor封装进ProxyFactory继承来的advisors数组中,然后使用jdk代理创建proxy,在invoke方法中构造处理链,先从前面的advisors数组中取出在BeanFactoryTransactionAttributeSourceAdvisor,再调用BeanFactoryTransactionAttributeSourceAdvisor#getAdvice 取出TransactionInterceptor,将其加入AOP处理链中,执行其中的invoke方法

spring事务自定义标签解析和AOP增强相关类

在spring-tx包下面META-INF目录找到了spring.handlers和spring.schemas

https\://www.springframework.org/schema/tx/spring-tx.xsd=org/springframework/transaction/config/spring-tx.xsd

首先找到自定义标签xsd位置

http\://www.springframework.org/schema/tx=org.springframework.transaction.config.TxNamespaceHandler

然后是自定义标签解析对应的类TxNamespaceHandler

TxNamespaceHandler

public void init() {
    registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser());
    registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
    registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser());
}

在这个handler中注册了三个parser分别解析三种标签,重点关注用于解析annotation-driven 的AnnotationDrivenBeanDefinitionParser

AnnotationDrivenBeanDefinitionParser

parse

parse是核心方法,方法主体是在做if-else分类,该方法解析了配置中的mode属性,如果是aspectj则使用Aspect切面进行解析,否则使用代理proxy的逻辑解析。

if ("aspectj".equals(mode)) {
	registerTransactionAspect(element, parserContext);
    ……
}
else {
	AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext);
}

即如果存在mode配置,事务配置将写作:

<tx:annotation-driven transaction-manager="transactionManager" mode=”aspectj”/>

这时使用Aspect切面进行解析,否则使用代理proxy的逻辑解析,重点关注proxy流程

AopAutoProxyConfigurer#configureAutoProxyCreator

AnnotationDrivenBeanDefinitionParser的内部类方法

在这个骨架方法中,主要做了以下几件事:

  • 把InfrastructureAdvisorAutoProxyCreator加载到IOC

  • 加载AnnotationTransactionAttributeSource实例

  • 加载TransactionInterceptor实例

  • 加载BeanFactoryTransactionAttributeSourceAdvisor

  • 把AnnotationTransactionAttributeSource和TransactionInterceptor配置到BeanFactoryTransactionAttributeSourceAdvisor中

  • 将上面三个Bean注册进IOC中

加载InfrastructureAdvisorAutoProxyCreator到IOC
AopNamespaceUtils.registerAutoProxyCreatorIfNecessary(parserContext, element);

这里好像注册了什么东西,继续跟进到AopNamespaceUtils#registerAutoProxyCreatorIfNecessary

public static void registerAutoProxyCreatorIfNecessary(
       ParserContext parserContext, Element sourceElement) {
    BeanDefinition beanDefinition = AopConfigUtils.registerAutoProxyCreatorIfNecessary(
          parserContext.getRegistry(), parserContext.extractSource(sourceElement));
    ……
}

然后到AopConfigUtils#registerAutoProxyCreatorIfNecessary

public static BeanDefinition registerAutoProxyCreatorIfNecessary(
       BeanDefinitionRegistry registry, @Nullable Object source) {
    return registerOrEscalateApcAsRequired(InfrastructureAdvisorAutoProxyCreator.class, registry, source);
}

继续跟进到AopConfigUtils#registerOrEscalateApcAsRequired

private static BeanDefinition registerOrEscalateApcAsRequired(
       Class<?> cls, BeanDefinitionRegistry registry, @Nullable Object source) {
    Assert.notNull(registry, "BeanDefinitionRegistry must not be null");
    if (registry.containsBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME)) {
       BeanDefinition apcDefinition = registry.getBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME);
       if (!cls.getName().equals(apcDefinition.getBeanClassName())) {
          int currentPriority = findPriorityForClass(apcDefinition.getBeanClassName());
          int requiredPriority = findPriorityForClass(cls);
          if (currentPriority < requiredPriority) {
             apcDefinition.setBeanClassName(cls.getName());
          }
       }
       return null;
    }
    ……
    registry.registerBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME, beanDefinition);
    return beanDefinition;
}

最终发现里面想要注册的是InfrastuctureAdvisorAutoProxyCreator的实例,它是AOP切面生成器,和AOP部分的AnnotationAwareAspectJAutoProxyCreator是类似的

http://www.chymfatfish.cn/archives/spring-aop#annotationawareaspectjautoproxycreator

但是如果同时开启了AOP的时候,根据优先级,可能统一注册成AnnotationAwareAspectJAutoProxyCreator

加载BeanFactoryTransactionAttributeSourceAdvisor同时加载AnnotationTransactionAttributeSource和TransactionInterceptor实例
RootBeanDefinition advisorDef = new RootBeanDefinition(BeanFactoryTransactionAttributeSourceAdvisor.class);TransactionInterceptor
……
advisorDef.getPropertyValues().add("transactionAttributeSource", new RuntimeBeanReference(sourceName));
advisorDef.getPropertyValues().add("adviceBeanName", interceptorName);

可见AnnotationTransactionAttributeSource是加到transactionAttributeSource属性中,这是BeanFactoryTransactionAttributeSourceAdvisor自己的属性,而TransactionInterceptor是增强器,加到其继承的父类属性adviceBeanName中

而在校验过程中可以发现,advisor是切面方法负责参与校验,真正的transaction执行相关就是Interceptor中的内容,详见spring事务增强器TransactionInterceptor部分

http://www.chymfatfish.cn/archives/spring-transactionshi#transactioninterceptor

InfrastructureAdvisorAutoProxyCreator

InfrastructureAdvisorAutoProxyCreator继承自AbstractAdvisorAutoProxyCreator,原来是AOP的加载启动类,负责为AOP加载Advisors和创建代理的。

同时它的父类实现BeanPostProcessor,说明它是一个可插入式的处理器,那么它的加载点就在ApplicationContext加载的时候,同时它的触发点在doCreateBean三部曲中

http://www.chymfatfish.cn/archives/spring-beansgetfromfactory#initializebean---%E5%88%9D%E5%A7%8B%E5%8C%96bean

因此在bean加载的时候会通过postProcessAfterInitialization调用进来,也会走到wrapIfNecessary方法。

在研究AOP的创建流程中,曾经研究过AbstractAdvisorAutoProxyCreator的另一个实现类AnnotationAwareAspectJAutoProxyCreator,还记得wrapIfNecessary方法有两个步骤:找到所有的切面方法和对应的切面类,创建成Advisors;然后创建代理

http://www.chymfatfish.cn/archives/spring-aop#annotationawareaspectjautoproxycreator

AbstractAutoProxyCreator#wrapIfNecessary,可以看到:

Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);

从这里向下找到AbstractAdvisorAutoProxyCreator#findEligibleAdvisors

protected List<Advisor> findEligibleAdvisors(Class<?> beanClass, String beanName) {
    List<Advisor> candidateAdvisors = findCandidateAdvisors();
    List<Advisor> eligibleAdvisors = findAdvisorsThatCanApply(candidateAdvisors, beanClass, beanName);
    extendAdvisors(eligibleAdvisors);
    if (!eligibleAdvisors.isEmpty()) {
       eligibleAdvisors = sortAdvisors(eligibleAdvisors);
    }
    return eligibleAdvisors;
}

还记得这个方法主要做了三个主要的事情(AOP重写的AnnotationAwareAspectJAutoProxyCreator是四个事情,事务这里没重写):找到所有的切面方法生成Advisor、查看哪些切面方法适配当前bean、按优先级排序。其中找到切面方法、生成Advisor的流程是一样的,可以参考AOP部分:

http://www.chymfatfish.cn/archives/spring-aop#findeligibleadvisors

当我们开启了事务配置,即增加了<tx:annotation-driven/> ,这里就会找到BeanFactoryTransactionAttributeSourceAdvisor的切面方法

这个切面方法是很眼熟的,就是在前面AopAutoProxyConfigurer#configureAutoProxyCreator方法中加载的三个bean中的最后一个

http://www.chymfatfish.cn/archives/spring-transactionshi#aopautoproxyconfigurer%23configureautoproxycreator

然后在findAdvisorsThatCanApply方法中做进一步解析,其中有一步涉及了后面的BeanFactoryTransactionAttributeSourceAdvisor

if (canApply(candidate, clazz, hasIntroductions)) {
    eligibleAdvisors.add(candidate);
}

这里canApply方法调用了以下方法:

return canApply(pca.getPointcut(), targetClass, hasIntroductions);

这里就是BeanFactoryTransactionAttributeSourceAdvisor#getPointcut方法,点我跳转

话说到最后,这个用于事务的AOP实现类到底自己实现了什么呢?其实它实现的主要是isEligibleAdvisorBean方法

protected boolean isEligibleAdvisorBean(String beanName) {
    return (this.beanFactory != null && this.beanFactory.containsBeanDefinition(beanName) &&
          this.beanFactory.getBeanDefinition(beanName).getRole() == BeanDefinition.ROLE_INFRASTRUCTURE);
}

也就是findEligibleAdvisors方法的第一步findCandidateAdvisors中调用的,可以参考BeanFactoryAdvisorRetrievalHelper#findAdvisorBeans 方法,其中之前在AOP部分看到过,有一个留给开发者自行实现的isEligibleBean方法,如下:

http://www.chymfatfish.cn/archives/spring-aop#findadvisorbeans

看完这块,再跟着代码思路看BeanFactoryTransactionAttributeSourceAdvisor

BeanFactoryTransactionAttributeSourceAdvisor

实现了AbstractBeanFactoryPointcutAdvisor,因此它是一个增强器类,它的核心成员变量有:

// 继承自增强器对应的beanName
private String adviceBeanName;
private BeanFactory beanFactory;
// 增强器中的切面方法
private transient volatile Advice advice;
private transient volatile Object adviceMonitor = new Object();
// 除了继承自父类的adviceBeanName、advice等,还有:
// 事务属性配置
private TransactionAttributeSource transactionAttributeSource;
// pointcut切点
private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() {...}

这个类本身是没有什么内容的,需要关注的是前面canApply哪里的一系列匹配逻辑,顺着代码思路来

首先BeanFactoryTransactionAttributeSourceAdvisor实现了getPointcut方法

public Pointcut getPointcut() {
    return this.pointcut;
}

因此,匹配的对象是成员变量pointcut的默认构造TransactionAttributeSourcePointcut

private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() {
    @Override
    @Nullable
    protected TransactionAttributeSource getTransactionAttributeSource() {
       return transactionAttributeSource;
    }
};

TransactionAttributeSourcePointcut

结合canApply的流程研究事务切面是如何匹配的,可以一遍回顾源码一边分析

http://www.chymfatfish.cn/archives/spring-aop#canapply(advisor-advisor%2C-class%3C%3F%3E-targetclass%2C-boolean-hasintroductions)

匹配点一、获取ClassFilter调用match方法

if (!pc.getClassFilter().matches(targetClass)) {
	return false;
}

看TransactionAttributeSourcePointcut的ClassFilter,是在默认构造中添加的

protected TransactionAttributeSourcePointcut() {
    setClassFilter(new TransactionAttributeSourceClassFilter());
}

而这个ClassFilter的matches方法实现如下:

private class TransactionAttributeSourceClassFilter implements ClassFilter {
    @Override
    public boolean matches(Class<?> clazz) {
       if (TransactionalProxy.class.isAssignableFrom(clazz) ||
             TransactionManager.class.isAssignableFrom(clazz) ||
             PersistenceExceptionTranslator.class.isAssignableFrom(clazz)) {
          return false;
       }
       TransactionAttributeSource tas = getTransactionAttributeSource();
       return (tas == null || tas.isCandidateClass(clazz));
    }
}

这里getTransactionAttributeSource实际上取到的就是在AnnotationDrivenBeanDefinitionParser#parse方法(点我跳转)中存入的AnnotationTransactionAttributeSource了,实际上调用到了父类AbstractFallbackTransactionAttributeSource的方法

最后进入对应的isCandidateClass方法,但是追溯到底其实发现默认策略是一定可以匹配上的,那这里实际上就没有真正产生校验效力。

匹配点2、获取methodMatcher调用matches方法

MethodMatcher methodMatcher = pc.getMethodMatcher();
……
methodMatcher.matches(method, targetClass))

TransactionAttributeSourcePointcut的getMethodMatcher方法在父类中:

public final MethodMatcher getMethodMatcher() {
	return this;
}

也就是说调用的实际上是自己的matches方法:

public boolean matches(Method method, Class<?> targetClass) {
	TransactionAttributeSource tas = getTransactionAttributeSource();
	return (tas == null || tas.getTransactionAttribute(method, targetClass) != null);
}

这里还是通过AnnotationTransactionAttributeSource的getTransactionAttribute获取目标bean上面是否有@Transactional配置,如果没有,解析出来是null,则无法通过校验。

至此校验完成

事务增强器创建事务相关类

TransactionInterceptor

事务切面增强器,也是事务最底层实现逻辑的代码所在

调用流程

回顾AOP流程,最终会通过JdkDynamicAopProxy#getProxy生成一个JDK代理,真正执行代理的时候,调用invoke方法,会封装AOP处理链

// Get the interception chain for this method.
List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);

这里的advised还记得是ProxyCreatorSupport的实现类ProxyFactory,里面存储了封装的BeanFactoryTransactionAttributeSourceAdvisor,是在哪封装的,得回顾AbstractAutoProxyCreator#createProxy 方法

Advisor[] advisors = buildAdvisors(beanName, specificInterceptors);
proxyFactory.addAdvisors(advisors);
proxyFactory.setTargetSource(targetSource);

然后在AdvisedSupport#getInterceptorsAndDynamicInterceptionAdvice 一路向下找,到DefaultAdvisorChainFactory#getInterceptorsAndDynamicInterceptionAdvice 方法中,看到

Advisor[] advisors = config.getAdvisors();

即从前面构造的ProxyFactory中取出了封装的BeanFactoryTransactionAttributeSourceAdvisor

因为它是一个PointcutAdvisor的实现,因此它会走if (advisor instanceof PointcutAdvisor) 为true的场景,这时看到

MethodInterceptor[] interceptors = registry.getInterceptors(advisor);

进一步跟踪,发下如下逻辑:

public MethodInterceptor[] getInterceptors(Advisor advisor) throws UnknownAdviceTypeException {
    List<MethodInterceptor> interceptors = new ArrayList<>(3);
    Advice advice = advisor.getAdvice();
    if (advice instanceof MethodInterceptor) {
       interceptors.add((MethodInterceptor) advice);
    }

即调用BeanFactoryTransactionAttributeSourceAdvisor#getAdvice方法

public Advice getAdvice() {
    Assert.state(this.transactionInterceptor != null, "No TransactionInterceptor set");
    return this.transactionInterceptor;
}

到这里终于知道transactionInterceptor是怎么被使用的了,拿到这个东西,再结合AOP后面的逻辑,就可以知道实际上处理链里面执行的是TransactionInterceptor#invoke方法了

invoke

public Object invoke(MethodInvocation invocation) throws Throwable {
    Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
    return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
       ……
    });
}

这里调用的是invokeWithinTransaction,继续向下跟踪

invokeWithinTransaction

TransactionInterceptor继承自TransactionAspectSupport,是Transaction流程的主体框架,主要功能有以下几个步骤:

  • 获取事务的属性

  • 加载配置中配的transactionManager

  • 不同的事务处理方式使用不同的事务

    • 反应式事务

    • ※申明式事务(最常用)

    • 编程式事务

  • 执行事务

    • 创建事务并获取事务信息

    • 执行目标方法

    • 出现异常,根据与异常进行异常处理和回滚

    • 提交事务前清除事务信息

    • 提交事务

获取事务属性
// If the transaction attribute is null, the method is non-transactional.
TransactionAttributeSource tas = getTransactionAttributeSource();
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);

首先获取事务属性,这里getTransactionAttribute方法获取的是前面见过很多次的AnnotationTransactionAttributeSource,它们的关联也是在AopAutoProxyConfigurer#configureAutoProxyCreator 配置的,点我跳转

然后调用AnnotationTransactionAttributeSource#getTransactionAttribute ,这里调用的是继承自父类的AbstractFallbackTransactionAttributeSource#getTransactionAttribute

具体代码参考AnnotationTransactionAttributeSource

加载配置中配的transactionManager
final TransactionManager tm = determineTransactionManager(txAttr);

主要流程就是根据事务注解中的管理器name去找对应的管理器,如果没有name获取默认管理器。其中也涉及到缓存存取的逻辑

申明式事务主流程

暂不考虑反应式事务和编程式事务,申明式事务是日常开发最常用的事务模式,以@Transaction为配置方法

if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
    // 创建事务并获取事务信息
    TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
    Object retVal;
    // 执行原始方法,同时catch异常
    try {
       // This is an around advice: Invoke the next interceptor in the chain.
       // This will normally result in a target object being invoked.
       retVal = invocation.proceedWithInvocation();
    }
    catch (Throwable ex) {
       // target invocation exception
       completeTransactionAfterThrowing(txInfo, ex);
       throw ex;
    }
    finally {
       // 提交前清除事务信息,提交前不管成功与否,清除事务信息,主要是对当前线程中的ThreadLocal信息进行清除
       cleanupTransactionInfo(txInfo);
    }
    if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
       // Set rollback-only in case of Vavr failure matching our rollback rules...
       TransactionStatus status = txInfo.getTransactionStatus();
       if (status != null && txAttr != null) {
          retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
       }
    }
    // 提交事务
    commitTransactionAfterReturning(txInfo);
    return retVal;
}

这里主要的流程就是:

createTransactionIfNecessary

这个方法主要是判断是否有必要创建事务

if (txAttr != null) {
	……
		status = tm.getTransaction(txAttr);
	……
	……
}
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);

这里两个大步骤,首先调用PlatformTransactionManager#getTransaction获取事务,参考PlatformTransactionManager

然后加载事务信息,prepareTransactionInfo方法在父类TransactionAspectSupport的代码中

prepareTransactionInfo

当已经建立事务连接并完成了事务的提取后,我们需要将所有的事务信息统一记录在TransactionInfo 类型的实例里面,这个实例包含了目标方法开始前的所有状态信息,一旦事务执行失败,Spring 会通过TransactionInfo 类型的实例中的信息来进行回滚等后续工作

方法prepareTransactionInfo 主要做了以下几件事:

  • 创建了一个TransactionInfo对象,把属性都塞进去

TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
  • 与线程绑定

txInfo.bindToThread();

继续向下跟进方法,可以看到实际上就是通过ThreadLocal绑定的

private void bindToThread() {
    this.oldTransactionInfo = transactionInfoHolder.get();
    transactionInfoHolder.set(this);
}
private static final ThreadLocal<TransactionInfo> transactionInfoHolder =
       new NamedThreadLocal<>("Current aspect-driven transaction");

commitTransactionAfterReturning

protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
    if (txInfo != null && txInfo.getTransactionStatus() != null) {
       if (logger.isTraceEnabled()) {
          logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
       }
       txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
    }
}

这里调用到AbstractPlatformTransactionManager#commit方法,点我跳转

completeTransactionAfterThrowing

方法主体就是一个大if,要求当前必须要有事务才执行回滚的流程

protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
	if (txInfo != null && txInfo.getTransactionStatus() != null) {
		……
	}
}

然后看里面的逻辑:

if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
    try {
       txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
    }
    catch ...
}
else {
    // We don't roll back on this exception.
    // Will still roll back if TransactionStatus.isRollbackOnly() is true.
    try {
       txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
    }
    catch ...
}

这里要求回滚事务必须基于某种异常,否则将继续commit,不会回滚,因此继续跟进默认实现DefaultTransactionAttribute#rollbackOn 方法:

return (ex instanceof RuntimeException || ex instanceof Error);

可见默认是对所有的RuntimeException和Error类型回滚,因此开发时要在@Transactional注解中加上里面的rollbackFor属性,指定为Exception.class,这种情况下会走RuleBasedTransactionAttribute#rollbackOn的流程

for (RollbackRuleAttribute rule : this.rollbackRules) {
    ……
    winner = rule;
}
return !(winner instanceof NoRollbackRuleAttribute);

这里可以看出来,从rollbackRules里面取出rule,然后判断它不是指定的不会滚属性,那就要回滚

或者也可以自行添加属性指定异常,例如:@Transactional(rollbackFor = NewException.class)

而rollbackFor属性在标签解析的parse流程中调用SpringTransactionAnnotationParser#parseTransactionAnnotation方法(点我跳转)被处理到了列表中,现在只是把它取出来

最后,判断可以回滚走到回滚流程

txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());

参考AbstractPlatformTransactionManager#rollback 方法(点我跳转

AnnotationTransactionAttributeSource

核心成员变量包括:

// 生成事务注解解析器的源码包是否存在
private static final boolean jta12Present;
private static final boolean ejb3Present;
// 事务标签解析器
private final Set<TransactionAnnotationParser> annotationParsers;

构造函数

this.annotationParsers.add(new SpringTransactionAnnotationParser());
if (jta12Present) {
	this.annotationParsers.add(new JtaTransactionAnnotationParser());
}
if (ejb3Present) {
	this.annotationParsers.add(new Ejb3TransactionAnnotationParser());
}

在构造函数中初始化了三种解析器,默认的是SpringTransactionAnnotationParser,jta和ejb3都是如果要判断一下是否引入对应的代码包。

isCandidateClass

transaction切面匹配策略

遍历构造好的annotationParsers依次调用isCandidateClass方法。但实际上通过默认的SpringTransactionAnnotationParser的isCandidateClass方法基本就能匹配上。

getTransactionAttribute

获取事务属性框架

骨架逻辑是先从缓存中查询,如果差不到执行获取方法computeTransactionAttribute,获取后存入缓存。

TransactionAttribute cached = this.attributeCache.get(cacheKey);
if (cached != null) {
……
}
else {
TransactionAttribute txAttr = computeTransactionAttribute(method, targetClass);
……
this.attributeCache.put(cacheKey, txAttr);

computeTransactionAttribute

超级核心的方法,用于获取事务

方法中可以看到很多spring事务失效的核心点。

if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {
	return null;
}

这里要求事务的方法是 public的,不然不生效注解式事务里面 AnnotationTransactionAttributeSource 里面默认是 true 的

Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);

这里适配了一些实现和桥接的问题:

因为注解的方法可能是在一个接口上,所以这里是获取target class的具体实现,比如:注解的method 为IFoo.bar() , targetClass 是 DefaultFoo ,这里是需要获取 DefaultFoo.bar()方法

同时这里还处理了可能出现桥接的问题。

但如果targetClass为null ,那方法就不会变。

后面做了三次解析尝试,分别解析了本方法上面的@Transactional如果没有则解析targetClass上面有没有,如果还没有则解析本方法的原始接口上面有没有。解析的方法都是以下方法:

TransactionAttribute txAttr = findTransactionAttribute(specificMethod);

实际核心解析方法是这里获取到的最终调用到determineTransactionAttribute

protected TransactionAttribute findTransactionAttribute(Method method) {
    return determineTransactionAttribute(method);
}

determineTransactionAttribute

protected TransactionAttribute determineTransactionAttribute(AnnotatedElement element) {
    for (TransactionAnnotationParser parser : this.annotationParsers) {
       TransactionAttribute attr = parser.parseTransactionAnnotation(element);
       if (attr != null) {
          return attr;
       }
    }
    return null;
}

还是之前构造好的解析器

当某个解析器可以获取到,就可以直接返回。默认首先通过SpringTransactionAnnotationParser解析器获取

下面专门看下SpringTransactionAnnotationParser

SpringTransactionAnnotationParser

isCandidateClass

方法匹配

其中调用的是AnnotationUtils中的判断方法:

public static boolean isCandidateClass(Class<?> clazz, String annotationName) {
	if (annotationName.startsWith("java.")) {
		return true;
	}
	if (AnnotationsScanner.hasPlainJavaAnnotationsOnly(clazz)) {
		return false;
	}
	return true;
}

所以它默认是java类型的都能匹配上

parseTransactionAnnotation(AnnotatedElement element)

事务注解解析

AnnotationAttributes attributes = AnnotatedElementUtils.findMergedAnnotationAttributes(element, Transactional.class, false, false);

可以看到这里解析的注解是Transactional.class,同时解析出当前bean中配置@Transactional标签中的所有属性

if (attributes != null) {
	return parseTransactionAnnotation(attributes);
}

如果属性存在,调用属性解析方法

parseTransactionAnnotation(AnnotationAttributes attributes)

事务注解属性解析,这里是真正的解析流程了。解析内容包括:

propagation、isolation、timeout、timeoutString、readOnly、value、label、rollbackFor、rollbackForClassName、noRollbackFor、noRollbackForClassName属性

以最常用的rollbackFor为例:

for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) {
	rollbackRules.add(new RollbackRuleAttribute(rbRule));
}

这里取到rollbackFor属性后,构造成RollbackRuleAttribute属性存到rollbackRules里面

AbstractPlatformTransactionManager

事务管理器模板

getTransaction

获取事务,到这里就比较核心了, 主要做了以下几件事:

  • 获取事务

  • 如果当前线程存在事务,则转向嵌套事务处理

  • 事务的超时设置验证

  • 事务的传播属性验证

  • 隔离级别, timeout ,connectinHolder 等配置

  • 绑定到当前线程

获取事务
Object transaction = doGetTransaction();

这里可以参考一个默认实现DataSourceTransactionManager#doGetTransaction

protected Object doGetTransaction() {
    DataSourceTransactionObject txObject = new DataSourceTransactionObject();
    txObject.setSavepointAllowed(isNestedTransactionAllowed());
    ConnectionHolder conHolder =
          (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
    txObject.setConnectionHolder(conHolder, false);
    return txObject;
}

核心逻辑就是在这里获取connection和事务:

ConnectionHolder conHolder =(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());

判断事务存在和嵌套
if (isExistingTransaction(transaction)) {
    // Existing transaction found -> check propagation behavior to find out how to behave.
    return handleExistingTransaction(def, transaction, debugEnabled);
}

如果事务已存在,这里走嵌套流程

private TransactionStatus handleExistingTransaction(
       TransactionDefinition definition, Object transaction, boolean debugEnabled)
       throws TransactionException {
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
       ……
    }
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
       ……
    }
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
      ……
    }
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
       ……
    }
    // PROPAGATION_REQUIRED, PROPAGATION_SUPPORTS, PROPAGATION_MANDATORY:
    // regular participation in existing transaction.
    if (debugEnabled) {
       logger.debug("Participating in existing transaction");
    }
    ……
    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}

可以看到该方法分别处理了PROPAGATION_NEVER、PROPAGATION_NOT_SUPPORTED、SYNCHRONIZATION_ALWAYS、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED几种事务传播场景,都是我们熟悉的场景

事务传播参考mysql部分补链接

具体处理流程先不继续看

继续验证事务传播属性

如果不存在事务,还要继续验证传播属性

if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
    throw new IllegalTransactionStateException(
          "No existing transaction found for transaction marked with propagation 'mandatory'");
}
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
       def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
       def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
    SuspendedResourcesHolder suspendedResources = suspend(null);
    ……
    try {
       return startTransaction(def, transaction, debugEnabled, suspendedResources);
    }
    ……
}
else {
    ……
}

这里也通过几个if和elseif判断了PROPAGATION_MANDATORY、PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED几种场景

如果是PROPAGATION_MANDATORY,则必须在一个已有事务里面运行,如果没有,直接报错,如果是REQUIRED几种场景,则执行挂起操作

SuspendedResourcesHolder suspendedResources = suspend(null);

挂起的核心逻辑是判断当前线程的事务同步器是否是活跃状态,如果是,获取事务同步器快照,并把当前线程清空,最后返回一个新的SuspendedResourcesHolder对象。然后开启新事务

验证完成后开启事务:

try {
    return startTransaction(def, transaction, debugEnabled, suspendedResources);
}

这里继续向下看

private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
       boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
    ……
    doBegin(transaction, definition);
    ……
    return status;
}

核心在doBegin方法,还是看DataSourceTransactionManager#doGetTransaction 实现的方法:

commit

if (defStatus.isLocalRollbackOnly()) {
	……
	processRollback(defStatus, false);
	return;
}
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
	……
	processRollback(defStatus, true);
	return;
}

这里是处理之前catch异常流程中做的标记,如果是异常仅回滚,或是事务链的回滚标记,这里回滚掉就返回,

如果不需要回滚就提交

processCommit(defStatus);

rollback

public final void rollback(TransactionStatus status) throws TransactionException {
    ……
    processRollback(defStatus, false);
}

逻辑主要在这里,执行processRollback

processRollback

执行回滚

triggerBeforeCompletion(status);

提供事务回滚前的自定义触发器的调用

if (status.hasSavepoint()) {
	……
	status.rollbackToHeldSavepoint();
}

处理嵌套事务的savepoint点,针对的PROPAGATION_NESTED这种传播场景

else if (status.isNewTransaction()) {
    ……
    doRollback(status);
}

新事务场景,处理独立事务回滚

if (status.hasTransaction()) {
    if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
       ……
       doSetRollbackOnly(status);
    }
    ……
}

处理嵌套事务场景,只打标记,要等到事务链执行完之后统一回滚

triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);

自定义触发器,回滚后的调用

cleanupAfterCompletion(status);
private void cleanupAfterCompletion(DefaultTransactionStatus status) {
    ……
    if (status.isNewTransaction()) {
       doCleanupAfterCompletion(status.getTransaction());
    }
    ……
}

清空记录的资源并将挂起的资源恢复,这里只针对新资源进行清理,还是看到默认实现的DataSourceTransactionManager#doCleanupAfterCompletion部分

protected void doCleanupAfterCompletion(Object transaction) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    // 首先给连接释放资源
    if (txObject.isNewConnectionHolder()) {
       TransactionSynchronizationManager.unbindResource(obtainDataSource());
    }
    // 重置连接
    Connection con = txObject.getConnectionHolder().getConnection();
    try {
       // 这里可以看到对connection中的AutoCommit属性、隔离级别、读写属性做了重置
       if (txObject.isMustRestoreAutoCommit()) {
          con.setAutoCommit(true);
       }
       DataSourceUtils.resetConnectionAfterTransaction(
             con, txObject.getPreviousIsolationLevel(), txObject.isReadOnly());
    }
    catch (Throwable ex) {
       logger.debug("Could not reset JDBC Connection after transaction", ex);
    }
    if (txObject.isNewConnectionHolder()) {
       if (logger.isDebugEnabled()) {
          logger.debug("Releasing JDBC Connection [" + con + "] after transaction");
       }
       // 最后释放了连接
       DataSourceUtils.releaseConnection(con, this.dataSource);
    }
    txObject.getConnectionHolder().clear();
}

0

评论区