前言

上篇文章我们介绍了 Spring 事务管理相关的核心类,本篇文章我们就直接从事务源码开始分析,包括声明式事务和编程式事务。

本篇文章使用的 SpringBoot 版本是 3.4.1 ,对应 Spring 版本 6.2.1

SpringBoot & Spring 架构图示概览

这里我以 SpringBoot 源码入口为起点,画了一个相关的流程图,包含了 SpringBoot、Spring 事务、Spring AOP、Spring 事件、BeanFactoryPostProcessor、BeanPostProcessor 等所有 Spring 知识,以及相关模块之间的交互联系,后续也会持续更新此图(因为我自己还没有学完),我试了下作者侧这边更新后,分享的协作链接也会实时变更,希望对大家有帮助

SpringBoot & Spring 架构图 持续更新 对于即将需要面试的同学应该会比较有帮助!

声明式事务源码分析

环绕事务运行

//org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
       final InvocationCallback invocation) throws Throwable {

    // 事务属性元信息包装
    TransactionAttributeSource tas = getTransactionAttributeSource();
    //获取当前方法的事务属性(@Transactional 注解里面的信息)
    final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
    //获取事务管理器
    final TransactionManager tm = determineTransactionManager(txAttr, targetClass);
    PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
    //事务名称
    final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
    if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager cpptm)) {
       // 开启事务
       TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
       Object retVal;
       try {
          // 调用拦截器链中下一个拦截器,如果没有就调用原目标方法。简单的示例中通常只有 
          // TransactionInterceptor 一个拦截器,这行代码前后都可以理解为环绕通知(增强)
          retVal = invocation.proceedWithInvocation();
       }
       catch (Throwable ex) {
          //回滚/提交事务
          completeTransactionAfterThrowing(txInfo, ex);
          throw ex;
       }
       finally {
          //清除当前线程事务信息
          cleanupTransactionInfo(txInfo);
       }
       //如果返回值是 Future 特殊处理
       if (retVal != null && txAttr != null) {
          //...
       }

       //提交/回滚事务
       commitTransactionAfterReturning(txInfo);
       return retVal;
    }
    //......
}

这段代码很简单,主要逻辑主干都注释了,细节都在内部方法,下面依次介绍

开启事务 createTransactionIfNecessary

这个方法里面有一段很重要的代码

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
    //...
    TransactionStatus status = null;
    if (txAttr != null) {
       if (tm != null) {
          //调用当前事务管理器获取事务,把事务信息封装到 TransactionStatus 中,默认实现类 DefaultTransactionStatus
          status = tm.getTransaction(txAttr);
       }
       //...
    }
    //...
}

我们重点看 getTransaction() 方法,事务管理器是如何实现的

//org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) {
    //...

    //从当前线程获取已存在的事务
    Object transaction = doGetTransaction();
    //如果是已存在的事务,结合已存在事务根据传播行为处理
    if (isExistingTransaction(transaction)) {
       return handleExistingTransaction(def, transaction, debugEnabled);
    }
    //...
    // 如果传播行为是必须有事务,而当前没有事务,直接抛异常
    if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
       throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation 'mandatory'");
    }
    //如果是以下三种传播行为
    else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
          def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
          def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
          //先挂起当前事务
       SuspendedResourcesHolder suspendedResources = suspend(null);
       //开启新事务
       try {
          return startTransaction(def, transaction, false, debugEnabled, suspendedResources);
       }
       catch (RuntimeException | Error ex) {
          //开启失败恢复被挂起的事务
          resume(null, suspendedResources);
          throw ex;
       }
    }
    //...
}

创建新事务

紧接着上一段,我们查看 AbstractPlatformTransactionManager#startTransaction 源码

private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
       boolean nested, boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {

    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, nested, debugEnabled, suspendedResources);
    //执行 事务开启前器
    this.transactionExecutionListeners.forEach(listener -> listener.beforeBegin(status));
    try {
       //调用数据源事务管理器开启事务
       doBegin(transaction, definition);
    }
    catch (RuntimeException | Error ex) {
    //执行 事务开启后器(发生异常也执行)
       this.transactionExecutionListeners.forEach(listener -> listener.afterBegin(status, ex));
       throw ex;
    }
    prepareSynchronization(status, definition);
    //执行 事务开启后器
    this.transactionExecutionListeners.forEach(listener -> listener.afterBegin(status, null));
    return status;
}

我们可以看到这里在 doBegin()(像不像 MySQLBEGIN 命令) 前后,也就是事务开启前、开启后分别执行了器 TransactionExecutionListener 的回调方法。接下来我们细看 doBegin(transaction, definition) 是如何开启事务的。

//org.springframework.jdbc.datasource.DataSourceTransactionManager#doBegin
protected void doBegin(Object transaction, TransactionDefinition definition) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    Connection con = null;

    try {
       if (!txObject.hasConnectionHolder() ||txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
             //从数据源那里获取一个数据库链接对象
          Connection newCon = obtainDataSource().getConnection();
          //把获取到的 Connection 对象包装后保存下来
          txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
       }
       txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
       //这就是新获取到的链接对象
       con = txObject.getConnectionHolder().getConnection();

       Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
       //设置本次数据库事务操作的隔离级别
       txObject.setPreviousIsolationLevel(previousIsolationLevel);
       //设置事务是否只读
       txObject.setReadOnly(definition.isReadOnly());

       // 很关键的一步,把自动提交事务改为 false,不自动提交
       if (con.getAutoCommit()) {
          txObject.setMustRestoreAutoCommit(true);
          con.setAutoCommit(false);
       }

       //设置事务激活
       txObject.getConnectionHolder().setTransactionActive(true);
       //设置事务超时时间
       int timeout = determineTimeout(definition);
       if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
          txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
       }

       // 把事务信息绑定到当前的线程 Thread 中
       if (txObject.isNewConnectionHolder()) {
          TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
       }
    }
    catch (Throwable ex) {
       if (txObject.isNewConnectionHolder()) {
          //异常场景释放连接
          DataSourceUtils.releaseConnection(con, obtainDataSource());
          txObject.setConnectionHolder(null, false);
       }
       throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
    }
}

我们前面提到过,Spring 管理事务,本质上是通过修改 java.sql.Connection 对象属性来管理事务的。而 DataSourceTransactionManager#doBegin 方法就是最直接的体现,这里从当前数据源获取到一个 Connection 对象,修改隔离级别、是否超时、是否只读、是否自动提交 等事务相关属性,并且把 Connection 对象绑定到当前线程上

这段代码里有一个很重要的点,把 ConnectionautoCommit 改成 false ,我们可以查看 java.sql.ConnectionsetAutoCommit 注释

/**
* Sets this connection's auto-commit mode to the given state.
* If a connection is in auto-commit mode, then all its SQL
* statements will be executed and committed as individual
* transactions.  Otherwise, its SQL statements are grouped into
* transactions that are terminated by a call to either
* the method {@code commit} or the method {@code rollback}.
* By default, new connections are in auto-commit
* mode.
*/ 
public interface Connection  extends Wrapper, AutoCloseable {
   void setAutoCommit(boolean autoCommit) throws SQLException;
}

注释解释了,如果自动提交开启,每一条 SQL 语句都是单独的事务,即没有事务。如果改成 false,那么就是一组 SQL 语句会绑定在一个事务里,调用 commit() 方法提交,默认情况下,新的 Connection 链接,autoCommit = false

处理已存在的事务

上一段我们提到,如果运行时发现当前线程已经存在事务了,那么会走到处理已存在事务 handleExistingTransaction() 方法,查看源码

private TransactionStatus handleExistingTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled) {
    //如果传播行为是不能有事务,但是当前有事务,直接抛异常
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
       throw new IllegalTransactionStateException("Existing transaction found for transaction marked with propagation 'never'");
    }
    
    //如果传播行为是以非事务方式运行,那么就以非事务方式运行
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
       //挂起当前事务
       Object suspendedResources = suspend(transaction);
       boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
       return prepareTransactionStatus(definition, null, false, newSynchronization, debugEnabled, suspendedResources);
    }
    
    //如果传播行为是需要在新事务中运行
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
       //挂起当前事务
       SuspendedResourcesHolder suspendedResources = suspend(transaction);
       try {
          //新开启一个事务,这个方法上一段分析过
          return startTransaction(definition, transaction, false, debugEnabled, suspendedResources);
       }
       catch (RuntimeException | Error beginEx) {
          //出现异常,恢复上一个事务
          resumeAfterBeginException(transaction, suspendedResources, beginEx);
          throw beginEx;
       }
    }
    //...
}

挂起事务

前面我们已经看到过两次开启新事务了,无非就是获取一个新的 Connection 对象,然后绑定到当前线程,修改 Connection 的相关属性来控制事务,下面我们重点看当嵌套事务的时候,父方法的事务是如何被挂起的。

//org.springframework.transaction.support.AbstractPlatformTransactionManager#suspend
//挂起事务
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
    //如果当前线程有事务同步器
    if (TransactionSynchronizationManager.isSynchronizationActive()) {
       //调用同步器的 suspend() 生命周期方法
       List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
       try {
          Object suspendedResources = null;
          if (transaction != null) {
             //真正挂起事务
             suspendedResources = doSuspend(transaction);
          }
          //重置 ThreadLocal 事务相关信息
          String name = TransactionSynchronizationManager.getCurrentTransactionName();
          TransactionSynchronizationManager.setCurrentTransactionName(null);
          boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
          TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
          Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
          TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
          boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
          TransactionSynchronizationManager.setActualTransactionActive(false);
          //包装被挂起的事务信息
          return new SuspendedResourcesHolder(suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
       }
       catch (RuntimeException | Error ex) {
          // 挂起事务异常,恢复同步器
          doResumeSynchronization(suspendedSynchronizations);
          throw ex;
       }
    }
    else if (transaction != null) {
       // 没有同步器的话,直接挂起即可
       Object suspendedResources = doSuspend(transaction);
       return new SuspendedResourcesHolder(suspendedResources);
    }
    //...
}

这里判断了同步器的存在,然后真正挂起事务后,把被挂起的事务封装成一个 SuspendedResourcesHolder 对象返回,因为我现在挂起了,后面肯定要恢复对不对?恢复的时候肯定需要知道被挂起的事务的相关信息,这里没有地方能存储,所以包装成一个对象返回回去。然后我们看真正挂起方法,是如何挂起的

//org.springframework.jdbc.datasource.DataSourceTransactionManager#doSuspend
@Override
protected Object doSuspend(Object transaction) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    txObject.setConnectionHolder(null);
    //从当前线程解绑事务
    return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}

这里太简单了,之前开启事务是怎么做的?获取一个 Connection 链接,然后封装一个对象出来绑定到当前线程,这里要挂起之前的事务,直接从当前线程移除就完了。

提交事务

retVal = invocation.proceedWithInvocation(); 目标方法执行后,后置增强在没有异常的情况下会提交事务,从 org.springframework.transaction.interceptor.TransactionAspectSupport#commitTransactionAfterReturning 方法跟踪源码,最终调用事务管理器的提交方法,内部调用 processCommit()

private void processCommit(DefaultTransactionStatus status) throws TransactionException {
    try {
       boolean beforeCompletionInvoked = false;
       boolean commitListenerInvoked = false;

       try {
          boolean unexpectedRollback = false;
          //可扩展
          prepareForCommit(status);
          //调用同步器的 beforeCommit() 方法
          triggerBeforeCommit(status);
          //调用同步器的 beforeCompletion() 方法
          triggerBeforeCompletion(status);
          beforeCompletionInvoked = true;

          //...
          //如果事务是自己创建的,才提交,这个属性很重要
          //在两个或多个嵌套事务方法中,如果传播行为都是 REQUIRED ,那么内部方法
          //执行后置增强准备提交事务时,发现这个属性不为 true 即代表这个事务不是自己创建的
          //那么就不能提交事务,因为子方法已经加入了父方法的事务中,要等父方法执行完毕才能提交
          else if (status.isNewTransaction()) {
             if (status.isDebug()) {
                logger.debug("Initiating transaction commit");
             }
             unexpectedRollback = status.isGlobalRollbackOnly();
             //执行器事务提交前方法
             this.transactionExecutionListeners.forEach(listener -> listener.beforeCommit(status));
             commitListenerInvoked = true;
             //真正向数据库提交事务,仅仅是调用 Connection.commit();
             doCommit(status);
          }
          else if (isFailEarlyOnGlobalRollbackOnly()) {
             unexpectedRollback = status.isGlobalRollbackOnly();
          }

          // 我相信大家应该都遇到过这个异常,还是嵌套两个 REQUIRED 传播行为的事务方法,
          // 内部方法抛出异常,但是在外层方法中 try-catch 了,由于他们两属于同一个事务
          // 内部方法标记事务应该回滚,但是外层方法捕获了内层方法异常,导致外层事务准备提交
          // 此时同一个事务既要回滚又要提交,就会报错
          if (unexpectedRollback) {
             throw new UnexpectedRollbackException("Transaction silently rolled back because it has been marked as rollback-only");
          }
       }
       catch (UnexpectedRollbackException ex) {
          //执行同步器 afterCompletion() 方法
          triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
          //执行器 afterRollback() 方法
          this.transactionExecutionListeners.forEach(listener -> listener.afterRollback(status, null));
          throw ex;
       }
       //...
}

这里我们重点查看了处理事务提交的逻辑,值得注意的是,处理事务提交的逻辑中也掺杂了回滚的逻辑,因为嵌套事务的存在导致事务的管理较为复杂。

回滚事务

好,这里我们继续看 org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction 方法后置增强中,当出现异常时回滚的逻辑,跟踪 completeTransactionAfterThrowing() 源码

protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
    if (txInfo != null && txInfo.getTransactionStatus() != null) {
       //如果当前异常是需要回滚的 @Transactional 的 rollbackFor 属性
       if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
          try {
             //调用事务管理器的回滚
             txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
          }
          //...
       }
       else {
          //如果当前异常不需要回滚,那就提交事务
          try {
             txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
          }
          //...
       }
    }
}

接着我们看事务管理器的回滚方法是如何实现的,它调用了 processRollback() 方法

private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
    try {
       boolean unexpectedRollback = unexpected;
       boolean rollbackListenerInvoked = false;

       try {
          //调用同步器的 beforeCompletion() 生命周期方法
          triggerBeforeCompletion(status);

          if (status.hasSavepoint()) {
             //如果有保存点,回滚到保存点
          }
          //如果是当前方法创建的事务,才回滚
          else if (status.isNewTransaction()) {
             //执行器回滚前方法
             this.transactionExecutionListeners.forEach(listener -> listener.beforeRollback(status));
             rollbackListenerInvoked = true;
             //真正的回滚逻辑
             doRollback(status);
          }
          else {
             // 如果不是当前方法创建的事务,也没有保存点,那就设置事务应该回滚的标记
             // 比如两个 REQUIRED 传播行为的嵌套事务,内层抛异常,外层捕获了内层的异常
             // 那么当内层方法执行完毕后不会回滚事务,而是标记当前事务应该回滚,等外层方法执行结束后
             // 才回滚,上面我们已经介绍过这种场景,由于内层方法标记当前事务应该回滚
             // 外层方法没有异常准备提交,同一个事务既要提交又要回滚则会出现异常,最终报错回滚
             if (status.hasTransaction()) {
                if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
                   if (status.isDebug()) {
                      logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
                   }
                   doSetRollbackOnly(status);
                }
             }
            //...
          }
       }
       //...
    finally {
       //清除当前线程绑定的事务信息
       cleanupAfterCompletion(status);
    }
}

恢复事务

前面我们介绍了事务的创建、提交、回滚还有嵌套事务时的挂起,既然挂起了事务,后面肯定需要恢复。我们还是以一个 REQUIRED 事务内部嵌套一个 REQUIRED_NEW 事务为例来分析,什么时候需要恢复事务

@Transactional
public void test(){
    test2Service.testNewTx();
}

@Service
public class Test2Service{
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void testNewTx(){
       //db 操作...
    }
}

还是以这段嵌套代码为例,当执行 testNewTx() 的前置增强时会挂起 test() 方法的事务,那么肯定是 testNewTx() 的后置增强时恢复 test() 方法的事务,通过前面几段内容我们知道了,挂起事务其实就是清除了绑定到当前线程的事务信息,那么恢复事务的时候肯定是要把这个信息再绑定到当前线程,也就是放入 TransactionSynchronizationManager 的几个 ThreadLocal 中。

那么关键的问题就来了,挂起的时候都已经从 ThreadLocal 移除了,恢复的时候要从哪拿这些被挂起事务的信息呢?毕竟 TransactionSynchronizationManager 内部没有专门提供一个 ThreadLocal 的变量来存储被挂起的事务信息。

这个问题的关键还是在 org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction 方法中,这个方法是所有嵌套事务方法的执行连接纽带,无论嵌套多少层方法,都是执行代理类的环绕增强,而代理类的环绕前置增强会调用目标方法,进而在目标方法执行过程中再执行内部嵌套方法的增强。

这句话可能有点绕口,下面我们通过图示来理解,两个方法的环绕增强执行顺序如下

事务管理的流程图如下

有了这个理解,我们再来理解 被挂起的事务的恢复 会简单很多,当内层方法执行完毕 testNewTx() 原始方法后,也就是 TransactionAspectSupport#invokeWithinTransaction 的如下片段

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,InvocationCallback invocation) throws Throwable {
       //...
       TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
       Object retVal;
       try {
          // 原始方法 testNewTx() 
          retVal = invocation.proceedWithInvocation();
       }
       catch (Throwable ex) {
          // target invocation exception
          completeTransactionAfterThrowing(txInfo, ex);
          throw ex;
       }
       // txInfo 这个对象里面存储了 父方法 test() 的事务信息
       commitTransactionAfterReturning(txInfo);
       return retVal;
    }

注意 createTransactionIfNecessary() 方法返回了 TransactionInfo txInfo被挂起的事务信息就被封装在其中,当内层方法 testNewTx() 事务提交后,会执行 org.springframework.transaction.support.AbstractPlatformTransactionManager#cleanupAfterCompletion 方法,查看源码

private void cleanupAfterCompletion(DefaultTransactionStatus status) {
    status.setCompleted();
    //如果是当前方法创建的事务,提交后这里清除 ThreadLocal 信息
    if (status.isNewSynchronization()) {
       TransactionSynchronizationManager.clear();
    }
    
    //如果是当前方法创建的事务,提交后这里重置、释放 Connection 链接
    if (status.isNewTransaction()) {
       doCleanupAfterCompletion(status.getTransaction());
    }
    //如果存在之前被挂起的事务
    if (status.getSuspendedResources() != null) {
       Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
       //真正恢复事务
       resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
    }
}

这里我们查看恢复事务的源码

protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder) {

    if (resourcesHolder != null) {
       Object suspendedResources = resourcesHolder.suspendedResources;
       if (suspendedResources != null) {
          //真正恢复
          doResume(transaction, suspendedResources);
       }
       List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
       if (suspendedSynchronizations != null) {
          TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
          TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
          TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
          TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
          //调用同步器的 doResume() 方法
          doResumeSynchronization(suspendedSynchronizations);
       }
    }
}

再查看 doResume() 源码

//org.springframework.jdbc.datasource.DataSourceTransactionManager#doResume
@Override
protected void doResume(@Nullable Object transaction, Object suspendedResources) {
    TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
}

可以看到和挂起事务一样的简单,只是把被挂起的事务信息再放回到当前线程的 ThreadLocal 里面

编程方式 PlatformTransactionManager

下面我们看当使用 PlatformTransactionManager 时如何管理编程式事务

@Service
public class OrderService {
    
    @Autowired
    private PlatformTransactionManager transactionManager;
    public void createOrder(Order order) {
        // 1. 定义事务属性
        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
        def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
        def.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
        def.setTimeout(30);
        def.setReadOnly(false);
        
        // 2. 获取事务
        TransactionStatus status = transactionManager.getTransaction(def);
        
        try {
            // 3. 执行业务操作
            orderDao.insert(order);
            orderDao.updateStock(order.getItems());
            // 4. 提交事务
            transactionManager.commit(status);
        } catch (Exception e) {
            // 5. 发生异常,回滚事务
            transactionManager.rollback(status);
            throw new RuntimeException("创建订单失败", e);
        }
    }
}

是不是发现特别简单?又回归到了事务本质,获取事务、提交事务、回滚事务。那么为什么声明式事务处理源码会那么多,那么复杂呢?这是因为声明式事务是一套能满足所有传播行为的公共逻辑而已,假如我们上面的示例非常复杂,会嵌套很多事务,那么我们也需要调用很多次 PlatformTransactionManager 的三个核心方法。

再看 TransactionTemplate 源码分析

以上篇文章的 TransactionTemplate 的示例为例

public void test2(){
    transactionTemplate.execute(new TransactionCallbackWithoutResult() {
        @Override
        protected void doInTransactionWithoutResult(TransactionStatus status) {
            try {
                //业务行为
                System.out.println(1);
            } catch (Exception e){
                status.setRollbackOnly();
                throw new RuntimeException("创建订单失败", e);
            }
        }
    });


}

我们看 transactionTemplate.execute() 源码

@Override
@Nullable
public <T> T execute(TransactionCallback<T> action) throws TransactionException {
    //...
    else {
       TransactionStatus status = this.transactionManager.getTransaction(this);
       T result;
       try {
          result = action.doInTransaction(status);
       }
       catch (RuntimeException | Error ex) {
          // Transactional code threw application exception -> rollback
          rollbackOnException(status, ex);
          throw ex;
       }
       catch (Throwable ex) {
          // Transactional code threw unexpected exception -> rollback
          rollbackOnException(status, ex);
          throw new UndeclaredThrowableException(ex, "TransactionCallback threw undeclared checked exception");
       }
       this.transactionManager.commit(status);
       return result;
    }
}

瞬间柳暗花明,原来 transactionTemplate 的编程式事务就是通过手动调用 PlatformTransactionManager 的核心方法来实现的,它把我们要处理的业务逻辑作为一个回调函数传递进去,然后在这个业务方法的回调函数前后来开启事务、提交/回滚事务。也变相的相当于环绕通知,可以理解为一个静态代理!

TransactionTemplate 总结

TransactionTemplate 相当于是一个封装的事务管理的工具类!它对我们要执行的业务逻辑做了一个静态代理!

TransactionTemplate 的最大优势是:在保持代码简洁的同时,提供了编程式事务的灵活性。  它是声明式事务(@Transactional)和纯编程式事务(PlatformTransactionManager)之间的相对完美平衡点。

常见疑惑案例

事务被标记回滚

@Transactional
public void test1(){
    //业务操作
    try {
        testService2.test2();
    }catch (Exception e){
        e.printStackTrace();
    }
}
//testService2
@Transactional(propagation = Propagation.REQUIRED)
public void test2() {
    //业务操作
    throw new RuntimeException("测试内层回滚");
}

由于子方法 test2() 中传播行为用的是加入当前事务,所以这两个方法的事务是同一个,那么 test2() 内部抛出异常,在 test1() 方法中捕获,test2() 执行完毕后标记了当前事务需要回滚,但是 test1() 捕获了异常,执行完毕后又需要提交,对于同一个事务先标记了回滚,后面又要正常提交,源码无法决定最终行为,抛出异常。

如果 test2() 传播行为用 Propagation.REQUIRED_NEW 则不会有这个问题

无法读到新事务已提交的修改

@Transactional
public void test11(){
   //假设原本状态是 WAITING
   CashRepayApply before =  cashRepayApplyMapper.get("TQYHK2c61e231e59c44cfafbe1bf1edf5b2be");
   System.out.println(before.getRepayStatus());
   //开启新事物更新它的状态
   listenerTestService2.test12();
   //重新查询
   CashRepayApply apply = cashRepayApplyMapper.get("TQYHK2c61e231e59c44cfafbe1bf1edf5b2be");
   //输出的还是 WAITING,MyBatisPlus、Hibernate 等框架无法读取到 test12() 中对它做的更新。虽然 DB 中已经更新了
   System.out.println(apply.getRepayStatus());
}
//test2Service
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void test12() {
    //新事务中修改它
    CashRepayApply apply = cashRepayApplyMapper.get("TQYHK2c61e231e59c44cfafbe1bf1edf5b2be");
    apply.setRepayStatus("SUCCESS");
    cashRepayApplyMapper.updateById(apply);
}

上面的示例中,虽然 test12() 是一个新事务,按道理说在后续执行 test11() 代码时 test12() 已经提交了最新修改到数据库中,但是 test11() 后续将极大概率无法读取到 test12() 对数据做的最新修改,这是因为 ORM 框架都有缓存,相同的当前事务上下文,对同一条数据进行了缓存,相同条件的再次查询时不会再次访问数据库。

要解决这个问题,我们可以手动清除缓存,不同的框架有不同的处理方式,比如 MyBatisPlusHibernate JPA ,这里不再赘述

结语

本篇文章介绍了 Spring 对事务管理的源码解读,文章的方式始终无法生动的介绍到代码细节,后续如果有空会和 Java 并发编程 AQS 介绍和源码解析(配视频) 一样上传视频。

如果这篇文章对你有帮助,记得点攒加关注!你的支持就是我继续创作的动力!

本站提供的所有下载资源均来自互联网,仅提供学习交流使用,版权归原作者所有。如需商业使用,请联系原作者获得授权。 如您发现有涉嫌侵权的内容,请联系我们 邮箱:alixiixcom@163.com