简单先容完spring的事务机制那就要引入这一次碰到的问题了,我相信大多数人应该和我一样,只知道怎么利用,好比加个注解啥的,但是底层原理不清楚。好一点的知道AOP动态署理,再好一点就是知道事务的传播机制(一样寻常也就用用默认的REQUIRED)。真正底层的事务处理的源码很多人应该是没有看过的,固然我也是没有滴~~ 但是这一次碰到的问题让我不得不去看源码了。
这段时间不停在做代码的重构,既然是重新写的代码,固然想写得漂亮一点,不然是要被后人戳脊梁骨的~~ 效果全部代码都写完,都提测了,在测试环境却报一个诡异的异常
java.sql.SQLException: PooledConnection has already been closed而且这不是必现的,而一旦出现,那任何涉及数据库连接的接口都有可能报这个错。从字面意思看是用到的数据库连接被关闭了,但是理解不能啊,这种底层的事情不都是spring帮忙做好了么。建议测试重启机器,心中期待不会再现。
本次源码利用的是spring版本是 4.2.4.RELEASE,事务管理器则是参照项目利用的DataSourceTransactionManager。
public Object invoke(final MethodInvocation invocation) throws Throwable { // Work out the target class: may be {@code null}. // The TransactionAttributeSource should be passed the target class // as well as the method, which may be from an interface. Class targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); // Adapt to TransactionAspectSupport's invokeWithinTransaction... return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() { @Override public Object proceedWithInvocation() throws Throwable { return invocation.proceed(); } });}
protected Object invokeWithinTransaction(Method method, Class targetClass, final InvocationCallback invocation) throws Throwable { // If the transaction attribute is null, the method is non-transactional. final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass); final PlatformTransactionManager tm = determineTransactionManager(txAttr); final String joinpointIdentification = methodIdentification(method, targetClass); if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { // Standard transaction demarcation with getTransaction and commit/rollback calls. // 如果当火线法需要事务则会创建事务(@Transactional不代表肯定创建事务,可以看spring的事务传播机制) TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal = null; try { // This is an around advice: Invoke the next interceptor in the chain. // This will normally result in a target object being invoked. // 可以将这个方法视为调用真正的业务方法(其实内部还有一些拦截器的处理) retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // target invocation exception // 事务抛出异常的时候的处理 completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { // 只做一件事,就是把事务的上下文信息改回本事务开始之前的上下文 // 由于有可能本事务是被包裹在其他事务中的,可以看spring的事务传播机制 cleanupTransactionInfo(txInfo); } // 事务执行成功后将事务的状态信息提交 commitTransactionAfterReturning(txInfo); return retVal; } else { // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in. // 省略 }}createTransactionIfNecessary
[*]TransactionAttribute 事务的属性,好比我们在@Transactional内里的一些定义,利用的事务管理器、事务隔离级别、超时时间等;
[*]TransactionStatus 事务的运行时状态,如是否已完成等;
[*]TransactionInfo 事务信息的聚合,包含了事务属性、事务状态、事务管理器、被事务包裹的方法定义信息、事务执行前的外层事务信息等。
protected TransactionInfo createTransactionIfNecessary(PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) { // If no name specified, apply method identification as transaction name. // 新建一个TransactionAttribute的署理对象,其实用的是装饰器模式 if (txAttr != null && txAttr.getName() == null) { txAttr = new DelegatingTransactionAttribute(txAttr) { @Override public String getName() { return joinpointIdentification; } }; } TransactionStatus status = null; if (txAttr != null) { if (tm != null) { // 这里会根据事务管理器获取事务对象 status = tm.getTransaction(txAttr); } else { if (logger.isDebugEnabled()) { logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured"); } } } // 将事务信息聚合然后返回,这里会有一个事务信息绑定到当火线程的操作(外层事务信息会存下来) return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);}
[*]TransactionSynchronizationManager 以threadLocal的方式保存当火线程事务相关信息的对象
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException { // 会新建一个事务对象,从TransactionSynchronizationManager中获取当火线程持有的数据库连接的句柄 //如果是最开始的事务,这个句柄是会为null的,如果是内层事务,则会复用连接 Object transaction = doGetTransaction(); // 省略 // 当火线程关联的数据库连接存在且事务处于激活状态,那么当前事务会根据事务传播机制来处理当前事务 if (isExistingTransaction(transaction)) { // Existing transaction found -> check propagation behavior to find out how to behave. return handleExistingTransaction(definition, transaction, debugEnabled); } // 省略 // No existing transaction found -> check propagation behavior to find out how to proceed. if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { // 省略 } else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition); } try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); // 开启事务 doBegin(transaction, definition); // 将事务信息绑定到当火线程(存在TransactionSynchronizationManager的threadLocal中) prepareSynchronization(status, definition); return status; } catch (RuntimeException ex) { resume(null, suspendedResources); throw ex; } catch (Error err) { resume(null, suspendedResources); throw err; } } else { // Create "empty" transaction: no actual transaction, but potentially synchronization. // 这里就是前面说的不需要事务的情况 if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { logger.warn("Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + definition); } boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null); }}
protected void doBegin(Object transaction, TransactionDefinition definition) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; Connection con = null; try { if (txObject.getConnectionHolder() == null || txObject.getConnectionHolder().isSynchronizedWithTransaction()) { // 从dataSource从获取一个连接,如果利用了连接池,则会从连接池中获取 Connection newCon = this.dataSource.getConnection(); if (logger.isDebugEnabled()) { logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction"); } // 事务对象设置连接的句柄 txObject.setConnectionHolder(new ConnectionHolder(newCon), true); } txObject.getConnectionHolder().setSynchronizedWithTransaction(true); con = txObject.getConnectionHolder().getConnection(); Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); txObject.setPreviousIsolationLevel(previousIsolationLevel); // Switch to manual commit if necessary. This is very expensive in some JDBC drivers, // so we don't want to do it unnecessarily (for example if we've explicitly // configured the connection pool to set it already). // 省略 // Bind the session holder to the thread. // 绑定数据库连接到当火线程,这里的key是dataSource,以是如果事务中换了dataSource那事务就不生效了 if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder()); } } catch (Throwable ex) { // 释放连接回连接池 // 当火线程持有的连接句柄也一并释放 }}到此为止,事务的信息全部准备好了,事务也开启了,这个时候业务方法就是在事务中执行了(如果配置了需要事务的话)。
protected void commitTransactionAfterReturning(TransactionInfo txInfo) { if (txInfo != null && txInfo.hasTransaction()) { if (logger.isTraceEnabled()) { logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]"); } txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); }}
public final void commit(TransactionStatus status) throws TransactionException { if (status.isCompleted()) { throw new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); } DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; if (defStatus.isLocalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Transactional code has requested rollback"); } processRollback(defStatus); return; } if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Global transaction is marked as rollback-only but transactional code requested commit"); } processRollback(defStatus); // Throw UnexpectedRollbackException only at outermost transaction boundary // or if explicitly asked to. if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) { throw new UnexpectedRollbackException( "Transaction rolled back because it has been marked as rollback-only"); } return; } processCommit(defStatus);}题外话
private void processCommit(DefaultTransactionStatus status) throws TransactionException { try { boolean beforeCompletionInvoked = false; try { prepareForCommit(status); // 调用当火线程的TransactionSynchronization列表的对应方法 // 这里注意,beforeCompletion方法的异常是会被吞掉的,beforeCommit的异常则会传播出去 triggerBeforeCommit(status); triggerBeforeCompletion(status); beforeCompletionInvoked = true; boolean globalRollbackOnly = false; if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) { globalRollbackOnly = status.isGlobalRollbackOnly(); } //如果用到了spring的NESTED传播,底层用到了数据库的savePoint,以是这里会释放 if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Releasing transaction savepoint"); } status.releaseHeldSavepoint(); } // 只有最外层的事务这里才是true else if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction commit"); } // 这里调用底层数据库连接的commit方法提交事务 doCommit(status); } // Throw UnexpectedRollbackException if we have a global rollback-only // marker but still didn't get a corresponding exception from commit. if (globalRollbackOnly) { throw new UnexpectedRollbackException( "Transaction silently rolled back because it has been marked as rollback-only"); } } catch (UnexpectedRollbackException ex) { // can only be caused by doCommit // 这里调用TransactionSynchronization列表的afterCompletion方法,会吞掉异常 triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); throw ex; } catch (TransactionException ex) { // can only be caused by doCommit if (isRollbackOnCommitFailure()) { // 如果提交异常这里会回滚事务,里层也是调用TransactionSynchronization列表的afterCompletion方法 // 只不过如果回滚失败,事务状态就是未知 doRollbackOnCommitException(status, ex); } else { // 单纯调用TransactionSynchronization列表的afterCompletion方法 triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); } throw ex; } catch (RuntimeException ex) { if (!beforeCompletionInvoked) { // 如果前面beforeCompletion未调用,则这里调一次 triggerBeforeCompletion(status); } // 回滚事务 doRollbackOnCommitException(status, ex); throw ex; } catch (Error err) { if (!beforeCompletionInvoked) { triggerBeforeCompletion(status); } doRollbackOnCommitException(status, err); throw err; } // Trigger afterCommit callbacks, with an exception thrown there // propagated to callers but the transaction still considered as committed. try { // 调用TransactionSynchronization列表的afterCommit方法 triggerAfterCommit(status); } finally { triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); } } finally { cleanupAfterCompletion(status); }}
private void cleanupAfterCompletion(DefaultTransactionStatus status) { // 将事务状态设为已完成 status.setCompleted(); // 最外层事务会去清理线程绑定的资源,包含TransactionSynchronization列表 if (status.isNewSynchronization()) { TransactionSynchronizationManager.clear(); } if (status.isNewTransaction()) { // 从当火线程绑定的资源中移除数据库连接句柄 // 将连接的一些属性重置,恢复默认值 // 将连接还给连接池(如果没用连接池会直接关闭连接) // 解除事务与连接的绑定关系 doCleanupAfterCompletion(status.getTransaction()); } // 用于事务的挂起和恢复,这里先略过 if (status.getSuspendedResources() != null) { if (status.isDebug()) { logger.debug("Resuming suspended transaction after completion of inner transaction"); } resume(status.getTransaction(), (SuspendedResourcesHolder) status.getSuspendedResources()); }}completeTransactionAfterThrowing
protected void completeTransactionAfterThrowing(TransactionInfo txInfo, Throwable ex) { if (txInfo != null && txInfo.hasTransaction()) { if (logger.isTraceEnabled()) { logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "] after exception: " + ex); } if (txInfo.transactionAttribute.rollbackOn(ex)) { try { // 里层调用的就是processRollback方法 txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus()); } catch (TransactionSystemException ex2) { logger.error("Application exception overridden by rollback exception", ex); ex2.initApplicationException(ex); throw ex2; } catch (RuntimeException ex2) { logger.error("Application exception overridden by rollback exception", ex); throw ex2; } catch (Error err) { logger.error("Application exception overridden by rollback error", ex); throw err; } } else { // We don't roll back on this exception. // Will still roll back if TransactionStatus.isRollbackOnly() is true. try { // 如果抛出的异常不属于回滚异常范围内,则事务依然提交 txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } catch (TransactionSystemException ex2) { logger.error("Application exception overridden by commit exception", ex); ex2.initApplicationException(ex); throw ex2; } catch (RuntimeException ex2) { logger.error("Application exception overridden by commit exception", ex); throw ex2; } catch (Error err) { logger.error("Application exception overridden by commit error", ex); throw err; } } }}
private void processRollback(DefaultTransactionStatus status) { try { try { triggerBeforeCompletion(status); if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Rolling back transaction to savepoint"); } status.rollbackToHeldSavepoint(); } else if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction rollback"); } // 调用数据库连接的回滚方法 doRollback(status); } else if (status.hasTransaction()) { if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) { if (status.isDebug()) { logger.debug("Participating transaction failed - marking existing transaction as rollback-only"); } doSetRollbackOnly(status); } else { if (status.isDebug()) { logger.debug("Participating transaction failed - letting transaction originator decide on rollback"); } } } else { logger.debug("Should roll back transaction but cannot - no transaction available"); } } catch (RuntimeException ex) { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); throw ex; } catch (Error err) { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); throw err; } triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); } finally { cleanupAfterCompletion(status); }}总结
/** * Invoked after transaction commit/rollback. * Can perform resource cleanup after transaction completion. *
NOTE: The transaction will have been committed or rolled back already, * but the transactional resources might still be active and accessible. As a * consequence, any data access code triggered at this point will still "participate" * in the original transaction, allowing to perform some cleanup (with no commit * following anymore!), unless it explicitly declares that it needs to run in a * separate transaction. Hence: Use {@code PROPAGATION_REQUIRES_NEW} * for any transactional operation that is called from here. * @param status completion status according to the {@code STATUS_*} constants * @throws RuntimeException in case of errors; will be logged but not propagated * (note: do not throw TransactionException subclasses here!) * @see #STATUS_COMMITTED * @see #STATUS_ROLLED_BACK * @see #STATUS_UNKNOWN * @see #beforeCompletion */void afterCompletion(int status);