不太稳健的小蚂蚁 发表于 2021-9-16 23:25:24

Spring事务处理机制源码分析以及踩坑小记

前言

我们都知道spring有声明式事务和编程式事务,声明式只需要提供@Transactional的注解,然后事务的开启和提交/回滚、资源的清理就都由spring来管控,我们只需要关注业务代码即可;而编程式事务则需要利用spring提供的模板,如TransactionTemplate,大概直接利用底层的PlatformTransactionManager。
声明式事务的最大优点就是对代码的侵入性较小,只需要在方法上加@Transactional的注解就可以实现事务;编程式事务的最大优点就是事务的管控粒度较细,可以实当代码块级别的事务。
背景

简单先容完spring的事务机制那就要引入这一次碰到的问题了,我相信大多数人应该和我一样,只知道怎么利用,好比加个注解啥的,但是底层原理不清楚。好一点的知道AOP动态署理,再好一点就是知道事务的传播机制(一样寻常也就用用默认的REQUIRED)。真正底层的事务处理的源码很多人应该是没有看过的,固然我也是没有滴~~ 但是这一次碰到的问题让我不得不去看源码了。
这段时间不停在做代码的重构,既然是重新写的代码,固然想写得漂亮一点,不然是要被后人戳脊梁骨的~~ 效果全部代码都写完,都提测了,在测试环境却报一个诡异的异常
java.sql.SQLException: PooledConnection has already been closed而且这不是必现的,而一旦出现,那任何涉及数据库连接的接口都有可能报这个错。从字面意思看是用到的数据库连接被关闭了,但是理解不能啊,这种底层的事情不都是spring帮忙做好了么。建议测试重启机器,心中期待不会再现。
https://p6.toutiaoimg.com/large/pgc-image/4094852035dc4e9aa98694bee62ceb88
效果依然出现,而且频率还不低,都阻塞测试了。。那就只好操起久违的调试源码的大刀,硬着头皮上了。
过程

本次源码利用的是spring版本是 4.2.4.RELEASE,事务管理器则是参照项目利用的DataSourceTransactionManager。
入口


[*]首先是事务的入口,spring用的是动态署理,如果某个方法被标注了@Transactional,则会由TransactionInterceptor拦截,在原始方法的前后增长一些额外的处理。可以看到,调用的是TransactionInterceptor的invoke方法,而内部又调用了invokeWithinTransaction方法,但其实这个并不肯定会创建事务(事务传播机制里有几种情况是不需要大概不支持事务的)。
public Object invoke(final MethodInvocation invocation) throws Throwable {    // Work out the target class: may be {@code null}.    // The TransactionAttributeSource should be passed the target class    // as well as the method, which may be from an interface.    Class targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);    // Adapt to TransactionAspectSupport's invokeWithinTransaction...    return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {      @Override      public Object proceedWithInvocation() throws Throwable {            return invocation.proceed();      }    });}
[*]TransactionInterceptor继承了TransactionAspectSupport这个抽象类,invokeWithinTransaction这个方法是在父类中的。方法里的英文注释是源码中的,中文注释是我加上去的。CallbackPreferringPlatformTransactionManager是实现了PlatformTransactionManager接口,如果利用的事务管理器是CallbackPreferringPlatformTransactionManager的实现,则会将事务的控制交由这个类的execute方法,这里先省略。我们来看一样寻常情况(很多应该用的是DataSourceTransactionManager吧),总的来说可以将这个方法分为几部分:
https://p6.toutiaoimg.com/large/pgc-image/235a46ee50b2477bb71d53b0c82f41e9
protected Object invokeWithinTransaction(Method method, Class targetClass, final InvocationCallback invocation) throws Throwable {    // If the transaction attribute is null, the method is non-transactional.    final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);    final PlatformTransactionManager tm = determineTransactionManager(txAttr);    final String joinpointIdentification = methodIdentification(method, targetClass);    if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {      // Standard transaction demarcation with getTransaction and commit/rollback calls.      // 如果当火线法需要事务则会创建事务(@Transactional不代表肯定创建事务,可以看spring的事务传播机制)      TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);      Object retVal = null;      try {            // This is an around advice: Invoke the next interceptor in the chain.            // This will normally result in a target object being invoked.            // 可以将这个方法视为调用真正的业务方法(其实内部还有一些拦截器的处理)            retVal = invocation.proceedWithInvocation();      }      catch (Throwable ex) {            // target invocation exception            // 事务抛出异常的时候的处理            completeTransactionAfterThrowing(txInfo, ex);            throw ex;      }      finally {            // 只做一件事,就是把事务的上下文信息改回本事务开始之前的上下文            // 由于有可能本事务是被包裹在其他事务中的,可以看spring的事务传播机制            cleanupTransactionInfo(txInfo);      }      // 事务执行成功后将事务的状态信息提交      commitTransactionAfterReturning(txInfo);      return retVal;    }    else {      // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.      // 省略    }}createTransactionIfNecessary

这里要知道几个类的含义:

[*]TransactionAttribute 事务的属性,好比我们在@Transactional内里的一些定义,利用的事务管理器、事务隔离级别、超时时间等;
[*]TransactionStatus 事务的运行时状态,如是否已完成等;
[*]TransactionInfo 事务信息的聚合,包含了事务属性、事务状态、事务管理器、被事务包裹的方法定义信息、事务执行前的外层事务信息等。
https://p3.toutiaoimg.com/large/pgc-image/a83bb756c27e419099e606427573eec8
这里复杂的是获取事务的方法,其他方法做的事情见我的中文注释。
protected TransactionInfo createTransactionIfNecessary(PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {    // If no name specified, apply method identification as transaction name.    // 新建一个TransactionAttribute的署理对象,其实用的是装饰器模式    if (txAttr != null && txAttr.getName() == null) {      txAttr = new DelegatingTransactionAttribute(txAttr) {            @Override            public String getName() {                return joinpointIdentification;            }      };    }    TransactionStatus status = null;    if (txAttr != null) {      if (tm != null) {            // 这里会根据事务管理器获取事务对象            status = tm.getTransaction(txAttr);      }      else {            if (logger.isDebugEnabled()) {                logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +                        "] because no transaction manager has been configured");            }      }    }    // 将事务信息聚合然后返回,这里会有一个事务信息绑定到当火线程的操作(外层事务信息会存下来)    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);}
[*]getTransaction方法


[*]TransactionSynchronizationManager 以threadLocal的方式保存当火线程事务相关信息的对象
这里省略了一些不重要的流程,重点是doBegin方法,这里会开启事务
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {    // 会新建一个事务对象,从TransactionSynchronizationManager中获取当火线程持有的数据库连接的句柄    //如果是最开始的事务,这个句柄是会为null的,如果是内层事务,则会复用连接    Object transaction = doGetTransaction();    // 省略      // 当火线程关联的数据库连接存在且事务处于激活状态,那么当前事务会根据事务传播机制来处理当前事务    if (isExistingTransaction(transaction)) {      // Existing transaction found -> check propagation behavior to find out how to behave.      return handleExistingTransaction(definition, transaction, debugEnabled);    }    // 省略    // No existing transaction found -> check propagation behavior to find out how to proceed.    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {      // 省略    }    else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||            definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||            definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {      SuspendedResourcesHolder suspendedResources = suspend(null);      if (debugEnabled) {            logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);      }      try {            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);            DefaultTransactionStatus status = newTransactionStatus(                  definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);            // 开启事务            doBegin(transaction, definition);            // 将事务信息绑定到当火线程(存在TransactionSynchronizationManager的threadLocal中)            prepareSynchronization(status, definition);            return status;      }      catch (RuntimeException ex) {            resume(null, suspendedResources);            throw ex;      }      catch (Error err) {            resume(null, suspendedResources);            throw err;      }    }    else {      // Create "empty" transaction: no actual transaction, but potentially synchronization.      // 这里就是前面说的不需要事务的情况      if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {            logger.warn("Custom isolation level specified but no actual transaction initiated; " +                  "isolation level will effectively be ignored: " + definition);      }      boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);      return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);    }}
[*]doBegin方法
protected void doBegin(Object transaction, TransactionDefinition definition) {    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;    Connection con = null;    try {      if (txObject.getConnectionHolder() == null ||                txObject.getConnectionHolder().isSynchronizedWithTransaction()) {            // 从dataSource从获取一个连接,如果利用了连接池,则会从连接池中获取            Connection newCon = this.dataSource.getConnection();            if (logger.isDebugEnabled()) {                logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");            }            // 事务对象设置连接的句柄            txObject.setConnectionHolder(new ConnectionHolder(newCon), true);      }      txObject.getConnectionHolder().setSynchronizedWithTransaction(true);      con = txObject.getConnectionHolder().getConnection();      Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);      txObject.setPreviousIsolationLevel(previousIsolationLevel);      // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,      // so we don't want to do it unnecessarily (for example if we've explicitly      // configured the connection pool to set it already).      // 省略      // Bind the session holder to the thread.      // 绑定数据库连接到当火线程,这里的key是dataSource,以是如果事务中换了dataSource那事务就不生效了      if (txObject.isNewConnectionHolder()) {            TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());      }    }    catch (Throwable ex) {      // 释放连接回连接池      // 当火线程持有的连接句柄也一并释放    }}到此为止,事务的信息全部准备好了,事务也开启了,这个时候业务方法就是在事务中执行了(如果配置了需要事务的话)。


事务执行完毕是需要举行资源的清理和释放的,spring在开启事务的时候绑定了很多信息到线程中,而现在的应用出于资源和性能的考虑,基本用的都是连接池和线程池,会有复用的可能性,如果资源的释放大概清理不到位,会有莫名其妙的问题出现(我这一次的问题就是这么来的。。。固然不是框架的问题,是本身操作有误)。
commitTransactionAfterReturning

这个方法是在业务方法正常返回后执行的,如果当前是存在事务的,则会调用事务管理器的commit方法
protected void commitTransactionAfterReturning(TransactionInfo txInfo) {    if (txInfo != null && txInfo.hasTransaction()) {      if (logger.isTraceEnabled()) {            logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");      }      txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());    }}
[*]commit方法
这里会有一些标志位的检测,如果设置为true,那事务是不会提交的,会回滚。好比单元测试的时候不管什么情况我们都不想提交,spring就是靠这个标志位实现的。processRollback方法会在后面分析回滚的时候用到,这里先略过。
public final void commit(TransactionStatus status) throws TransactionException {    if (status.isCompleted()) {      throw new IllegalTransactionStateException(                "Transaction is already completed - do not call commit or rollback more than once per transaction");    }    DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;    if (defStatus.isLocalRollbackOnly()) {      if (defStatus.isDebug()) {            logger.debug("Transactional code has requested rollback");      }      processRollback(defStatus);      return;    }    if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {      if (defStatus.isDebug()) {            logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");      }      processRollback(defStatus);      // Throw UnexpectedRollbackException only at outermost transaction boundary      // or if explicitly asked to.      if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {            throw new UnexpectedRollbackException(                  "Transaction rolled back because it has been marked as rollback-only");      }      return;    }    processCommit(defStatus);}题外话

还记得TransactionSynchronizationManager这个类吗?内里维护了当火线程的一些信息,此中有一个就是TransactionSynchronization的列表,我们可以自定义实现一个TransactionSynchronization然后在事务中绑定到当火线程,这样可以实现在事务提交前大概提交后大概完成后执行一些我们自定义的操作。这次出现的问题就是由于我们业务代码里有自定义实现的TransactionSynchronization,至于具体原因后面再详述。

[*]processCommit方法
各个方法做的事见中文注释,这里要注意cleanupAfterCompletion方法,会去清理相关的资源。
private void processCommit(DefaultTransactionStatus status) throws TransactionException {    try {      boolean beforeCompletionInvoked = false;      try {            prepareForCommit(status);            // 调用当火线程的TransactionSynchronization列表的对应方法            // 这里注意,beforeCompletion方法的异常是会被吞掉的,beforeCommit的异常则会传播出去            triggerBeforeCommit(status);            triggerBeforeCompletion(status);            beforeCompletionInvoked = true;            boolean globalRollbackOnly = false;            if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {                globalRollbackOnly = status.isGlobalRollbackOnly();            }            //如果用到了spring的NESTED传播,底层用到了数据库的savePoint,以是这里会释放            if (status.hasSavepoint()) {                if (status.isDebug()) {                  logger.debug("Releasing transaction savepoint");                }                status.releaseHeldSavepoint();            }            // 只有最外层的事务这里才是true            else if (status.isNewTransaction()) {                if (status.isDebug()) {                  logger.debug("Initiating transaction commit");                }                // 这里调用底层数据库连接的commit方法提交事务                doCommit(status);            }            // Throw UnexpectedRollbackException if we have a global rollback-only            // marker but still didn't get a corresponding exception from commit.            if (globalRollbackOnly) {                throw new UnexpectedRollbackException(                        "Transaction silently rolled back because it has been marked as rollback-only");            }      }      catch (UnexpectedRollbackException ex) {            // can only be caused by doCommit            // 这里调用TransactionSynchronization列表的afterCompletion方法,会吞掉异常            triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);            throw ex;      }      catch (TransactionException ex) {            // can only be caused by doCommit            if (isRollbackOnCommitFailure()) {                // 如果提交异常这里会回滚事务,里层也是调用TransactionSynchronization列表的afterCompletion方法                // 只不过如果回滚失败,事务状态就是未知                doRollbackOnCommitException(status, ex);            }            else {                // 单纯调用TransactionSynchronization列表的afterCompletion方法                triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);            }            throw ex;      }      catch (RuntimeException ex) {            if (!beforeCompletionInvoked) {                // 如果前面beforeCompletion未调用,则这里调一次                triggerBeforeCompletion(status);            }            // 回滚事务            doRollbackOnCommitException(status, ex);            throw ex;      }      catch (Error err) {            if (!beforeCompletionInvoked) {                triggerBeforeCompletion(status);            }            doRollbackOnCommitException(status, err);            throw err;      }      // Trigger afterCommit callbacks, with an exception thrown there      // propagated to callers but the transaction still considered as committed.      try {            // 调用TransactionSynchronization列表的afterCommit方法            triggerAfterCommit(status);      }      finally {            triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);      }    }    finally {      cleanupAfterCompletion(status);    }}
[*]cleanupAfterCompletion方法
这里关于资源的清理和释放操作比较多,稍有不慎,万劫不复啊。。。
private void cleanupAfterCompletion(DefaultTransactionStatus status) {    // 将事务状态设为已完成    status.setCompleted();    // 最外层事务会去清理线程绑定的资源,包含TransactionSynchronization列表    if (status.isNewSynchronization()) {      TransactionSynchronizationManager.clear();    }    if (status.isNewTransaction()) {      // 从当火线程绑定的资源中移除数据库连接句柄      // 将连接的一些属性重置,恢复默认值      // 将连接还给连接池(如果没用连接池会直接关闭连接)      // 解除事务与连接的绑定关系      doCleanupAfterCompletion(status.getTransaction());    }    // 用于事务的挂起和恢复,这里先略过    if (status.getSuspendedResources() != null) {      if (status.isDebug()) {            logger.debug("Resuming suspended transaction after completion of inner transaction");      }      resume(status.getTransaction(), (SuspendedResourcesHolder) status.getSuspendedResources());    }}completeTransactionAfterThrowing

业务方法抛出异常后会执行本方法,主要就是事务的回滚以及定义的TransactionSynchronization列表的关联事务方法的执行,上面有提到,这里就不详述了。
protected void completeTransactionAfterThrowing(TransactionInfo txInfo, Throwable ex) {    if (txInfo != null && txInfo.hasTransaction()) {      if (logger.isTraceEnabled()) {            logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +                  "] after exception: " + ex);      }      if (txInfo.transactionAttribute.rollbackOn(ex)) {            try {                // 里层调用的就是processRollback方法                txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());            }            catch (TransactionSystemException ex2) {                logger.error("Application exception overridden by rollback exception", ex);                ex2.initApplicationException(ex);                throw ex2;            }            catch (RuntimeException ex2) {                logger.error("Application exception overridden by rollback exception", ex);                throw ex2;            }            catch (Error err) {                logger.error("Application exception overridden by rollback error", ex);                throw err;            }      }      else {            // We don't roll back on this exception.            // Will still roll back if TransactionStatus.isRollbackOnly() is true.            try {                // 如果抛出的异常不属于回滚异常范围内,则事务依然提交                txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());            }            catch (TransactionSystemException ex2) {                logger.error("Application exception overridden by commit exception", ex);                ex2.initApplicationException(ex);                throw ex2;            }            catch (RuntimeException ex2) {                logger.error("Application exception overridden by commit exception", ex);                throw ex2;            }            catch (Error err) {                logger.error("Application exception overridden by commit error", ex);                throw err;            }      }    }}
[*]processRollback方法
这里很多方法前面都有提到,不详述了。
private void processRollback(DefaultTransactionStatus status) {    try {      try {            triggerBeforeCompletion(status);            if (status.hasSavepoint()) {                if (status.isDebug()) {                  logger.debug("Rolling back transaction to savepoint");                }                status.rollbackToHeldSavepoint();            }            else if (status.isNewTransaction()) {                if (status.isDebug()) {                  logger.debug("Initiating transaction rollback");                }                // 调用数据库连接的回滚方法                doRollback(status);            }            else if (status.hasTransaction()) {                if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {                  if (status.isDebug()) {                        logger.debug("Participating transaction failed - marking existing transaction as rollback-only");                  }                  doSetRollbackOnly(status);                }                else {                  if (status.isDebug()) {                        logger.debug("Participating transaction failed - letting transaction originator decide on rollback");                  }                }            }            else {                logger.debug("Should roll back transaction but cannot - no transaction available");            }      }      catch (RuntimeException ex) {            triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);            throw ex;      }      catch (Error err) {            triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);            throw err;      }      triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);    }    finally {      cleanupAfterCompletion(status);    }}总结

到此为止,spring关于事务的处理的源码差不多分析完了,回到正题,为啥会出现连接已关闭的情况呢?由于我们自定义了一个TransactionSynchronization来实现事务事件触发机制,而且在TransactionSynchronization的afterCompletion方法中操作了Dao层,也就是用到了数据库连接。看一下afterCompletion方法的注释,内里有提到这个时候事务已经提交大概回滚了,但是相关资源可能还没有释放,以是一旦有与数据库连接相关的代码,可能会到场到前面的事务中去。如果这里非要执行与数据库连接相关的操作,spring建议明确标注,而且利用新开事务的传播机制。框架封装好,利用需审慎啊。
/** * Invoked after transaction commit/rollback. * Can perform resource cleanup after transaction completion. *
NOTE: The transaction will have been committed or rolled back already, * but the transactional resources might still be active and accessible. As a * consequence, any data access code triggered at this point will still "participate" * in the original transaction, allowing to perform some cleanup (with no commit * following anymore!), unless it explicitly declares that it needs to run in a * separate transaction. Hence: Use {@code PROPAGATION_REQUIRES_NEW} * for any transactional operation that is called from here. * @param status completion status according to the {@code STATUS_*} constants * @throws RuntimeException in case of errors; will be logged but not propagated * (note: do not throw TransactionException subclasses here!) * @see #STATUS_COMMITTED * @see #STATUS_ROLLED_BACK * @see #STATUS_UNKNOWN * @see #beforeCompletion */void afterCompletion(int status);
页: [1]
查看完整版本: Spring事务处理机制源码分析以及踩坑小记