From b5e5e3307838359b42c7f4b1ed592b03b249eb4f Mon Sep 17 00:00:00 2001 From: Juergen Hoeller Date: Thu, 2 May 2019 12:04:25 +0200 Subject: [PATCH] Remove legacy config options from AbstractReactiveTransactionManager Includes general revision of reactive transaction sources. See gh-22646 --- .../ReactiveTransactionManager.java | 9 +- .../ReactiveTransactionStatus.java | 21 +- .../AbstractReactiveTransactionManager.java | 754 +++++------------- .../AbstractReactiveTransactionStatus.java | 35 +- .../DefaultReactiveTransactionStatus.java | 6 +- .../DefaultTransactionalOperator.java | 58 +- ...ReactiveResourceHolderSynchronization.java | 49 +- .../reactive/ReactiveTransactionCallback.java | 7 +- .../ReactiveTransactionSynchronization.java | 11 - ...tiveTransactionSynchronizationManager.java | 116 +-- ...activeTransactionSynchronizationUtils.java | 85 +- .../reactive/TransactionContext.java | 70 +- .../reactive/TransactionContextHolder.java | 36 +- .../reactive/TransactionContextManager.java | 26 +- .../reactive/TransactionalOperator.java | 28 +- .../transaction/reactive/package-info.java | 2 +- .../ReactiveTestTransactionManager.java | 6 +- .../ReactiveTransactionSupportUnitTests.java | 21 +- .../reactive/TransactionalOperatorTests.java | 24 +- 19 files changed, 385 insertions(+), 979 deletions(-) diff --git a/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionManager.java b/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionManager.java index adbaf38106e4..e30a3628314d 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionManager.java +++ b/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionManager.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,8 +18,6 @@ import reactor.core.publisher.Mono; -import org.springframework.lang.Nullable; - /** * This is the central interface in Spring's reactive transaction infrastructure. * Applications can use this directly, but it is not primarily meant as API: @@ -27,6 +25,7 @@ * declarative transaction demarcation through AOP. * * @author Mark Paluch + * @author Juergen Hoeller * @since 5.2 * @see org.springframework.transaction.interceptor.TransactionProxyFactoryBean */ @@ -43,7 +42,7 @@ public interface ReactiveTransactionManager { *

An exception to the above rule is the read-only flag, which should be * ignored if no explicit read-only mode is supported. Essentially, the * read-only flag is just a hint for potential optimization. - * @param definition the TransactionDefinition instance (can be empty for defaults), + * @param definition the TransactionDefinition instance, * describing propagation behavior, isolation level, timeout etc. * @return transaction status object representing the new or current transaction * @throws TransactionException in case of lookup, creation, or system errors @@ -55,7 +54,7 @@ public interface ReactiveTransactionManager { * @see TransactionDefinition#getTimeout * @see TransactionDefinition#isReadOnly */ - Mono getTransaction(@Nullable TransactionDefinition definition) throws TransactionException; + Mono getTransaction(TransactionDefinition definition) throws TransactionException; /** * Commit the given transaction, with regard to its status. If the transaction diff --git a/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionStatus.java b/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionStatus.java index 99c3f77394e7..01d5f16ab069 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionStatus.java +++ b/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionStatus.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,8 +16,6 @@ package org.springframework.transaction; -import reactor.core.publisher.Mono; - /** * Representation of the status of a transaction exposing a reactive * interface. @@ -27,6 +25,7 @@ * an exception that causes an implicit rollback). * * @author Mark Paluch + * @author Juergen Hoeller * @since 5.2 * @see #setRollbackOnly() * @see ReactiveTransactionManager#getTransaction @@ -45,10 +44,10 @@ public interface ReactiveTransactionStatus { * that the only possible outcome of the transaction may be a rollback, as * alternative to throwing an exception which would in turn trigger a rollback. *

This is mainly intended for transactions managed by - * {@link org.springframework.transaction.reactive.support.TransactionalOperator} or - * {@link org.springframework.transaction.interceptor.ReactiveTransactionInterceptor}, + * {@link org.springframework.transaction.reactive.TransactionalOperator} or + * {@link org.springframework.transaction.interceptor.TransactionInterceptor}, * where the actual commit/rollback decision is made by the container. - * @see org.springframework.transaction.reactive.support.ReactiveTransactionCallback#doInTransaction + * @see org.springframework.transaction.reactive.ReactiveTransactionCallback#doInTransaction * @see org.springframework.transaction.interceptor.TransactionAttribute#rollbackOn */ void setRollbackOnly(); @@ -59,15 +58,6 @@ public interface ReactiveTransactionStatus { */ boolean isRollbackOnly(); - /** - * Flush the underlying session to the datastore, if applicable. - *

This is effectively just a hint and may be a no-op if the underlying - * transaction manager does not have a flush concept. A flush signal may - * get applied to the primary resource or to transaction synchronizations, - * depending on the underlying resource. - */ - Mono flush(); - /** * Return whether this transaction is completed, that is, * whether it has already been committed or rolled back. @@ -75,4 +65,5 @@ public interface ReactiveTransactionStatus { * @see ReactiveTransactionManager#rollback */ boolean isCompleted(); + } diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java index 526767192db3..849c6bced5c4 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java @@ -19,7 +19,6 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.Serializable; -import java.time.Duration; import java.util.List; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; @@ -30,25 +29,20 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import org.springframework.core.Constants; import org.springframework.lang.Nullable; import org.springframework.transaction.IllegalTransactionStateException; import org.springframework.transaction.InvalidTimeoutException; -import org.springframework.transaction.NestedTransactionNotSupportedException; +import org.springframework.transaction.ReactiveTransactionManager; +import org.springframework.transaction.ReactiveTransactionStatus; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionException; import org.springframework.transaction.TransactionSuspensionNotSupportedException; import org.springframework.transaction.UnexpectedRollbackException; -import org.springframework.transaction.ReactiveTransactionManager; -import org.springframework.transaction.ReactiveTransactionStatus; -import org.springframework.transaction.support.DefaultTransactionDefinition; -import org.springframework.transaction.support.TransactionSynchronizationManager; -import org.springframework.util.Assert; - /** * Abstract base class that implements Spring's standard reactive transaction workflow, * serving as basis for concrete platform transaction managers. + * *

This base class provides the following workflow handling: *

+ * *

Subclasses have to implement specific template methods for specific * states of a transaction, e.g.: begin, suspend, resume, commit, rollback. * The most important of them are abstract and must be provided by a concrete * implementation; for the rest, defaults are provided, so overriding is optional. + * *

Transaction synchronization is a generic mechanism for registering callbacks * that get invoked at transaction completion time. This is mainly used internally * by the data access support classes for R2DBC, MongoDB, etc. The same mechanism can * also be leveraged for custom synchronization needs in an application. + * *

The state of this class is serializable, to allow for serializing the * transaction strategy along with proxies that carry a transaction interceptor. * It is up to subclasses if they wish to make their state to be serializable too. @@ -76,248 +72,15 @@ * to Java serialization rules) if they need to restore any transient state. * * @author Mark Paluch + * @author Juergen Hoeller * @since 5.2 - * @see #setTransactionSynchronization * @see ReactiveTransactionSynchronizationManager */ -@SuppressWarnings({"serial", "WeakerAccess"}) +@SuppressWarnings("serial") public abstract class AbstractReactiveTransactionManager implements ReactiveTransactionManager, Serializable { - /** - * Always activate transaction synchronization, even for "empty" transactions - * that result from PROPAGATION_SUPPORTS with no existing backend transaction. - * - * @see org.springframework.transaction.TransactionDefinition#PROPAGATION_SUPPORTS - * @see org.springframework.transaction.TransactionDefinition#PROPAGATION_NOT_SUPPORTED - * @see org.springframework.transaction.TransactionDefinition#PROPAGATION_NEVER - */ - public static final int SYNCHRONIZATION_ALWAYS = 0; - - /** - * Activate transaction synchronization only for actual transactions, - * that is, not for empty ones that result from PROPAGATION_SUPPORTS with - * no existing backend transaction. - * - * @see org.springframework.transaction.TransactionDefinition#PROPAGATION_REQUIRED - * @see org.springframework.transaction.TransactionDefinition#PROPAGATION_MANDATORY - * @see org.springframework.transaction.TransactionDefinition#PROPAGATION_REQUIRES_NEW - */ - public static final int SYNCHRONIZATION_ON_ACTUAL_TRANSACTION = 1; - - /** - * Never active transaction synchronization, not even for actual transactions. - */ - public static final int SYNCHRONIZATION_NEVER = 2; - - - /** - * Constants instance for AbstractReactiveTransactionManager. - */ - private static final Constants constants = new Constants(AbstractReactiveTransactionManager.class); - - protected transient Log logger = LogFactory.getLog(getClass()); - private int transactionSynchronization = SYNCHRONIZATION_ALWAYS; - - private Duration defaultTimeout = Duration.ofSeconds(TransactionDefinition.TIMEOUT_DEFAULT); - - private boolean nestedTransactionAllowed = false; - - private boolean validateExistingTransaction = false; - - private boolean globalRollbackOnParticipationFailure = true; - - private boolean failEarlyOnGlobalRollbackOnly = false; - - private boolean rollbackOnCommitFailure = false; - - - /** - * Set the transaction synchronization by the name of the corresponding constant - * in this class, e.g. "SYNCHRONIZATION_ALWAYS". - * @param constantName name of the constant - * @see #SYNCHRONIZATION_ALWAYS - */ - public final void setTransactionSynchronizationName(String constantName) { - setTransactionSynchronization(constants.asNumber(constantName).intValue()); - } - - /** - * Set when this transaction manager should activate the subscriber context-bound - * transaction synchronization support. Default is "always". - *

Note that transaction synchronization isn't supported for - * multiple concurrent transactions by different transaction managers. - * Only one transaction manager is allowed to activate it at any time. - * @see #SYNCHRONIZATION_ALWAYS - * @see #SYNCHRONIZATION_ON_ACTUAL_TRANSACTION - * @see #SYNCHRONIZATION_NEVER - * @see ReactiveTransactionSynchronizationManager - * @see ReactiveTransactionSynchronization - */ - public final void setTransactionSynchronization(int transactionSynchronization) { - this.transactionSynchronization = transactionSynchronization; - } - - /** - * Return if this transaction manager should activate the subscriber context-bound - * transaction synchronization support. - */ - public final int getTransactionSynchronization() { - return this.transactionSynchronization; - } - - /** - * Specify the default timeout that this transaction manager should apply - * if there is no timeout specified at the transaction level, in seconds. - *

Default is the underlying transaction infrastructure's default timeout, - * e.g. typically 30 seconds in case of a JTA provider, indicated by the - * {@code TransactionDefinition.TIMEOUT_DEFAULT} value. - * @see org.springframework.transaction.TransactionDefinition#TIMEOUT_DEFAULT - */ - public final void setDefaultTimeout(Duration defaultTimeout) { - Assert.notNull(defaultTimeout, "Default timeout must not be null"); - if (defaultTimeout.getSeconds() < TransactionDefinition.TIMEOUT_DEFAULT) { - throw new InvalidTimeoutException("Invalid default timeout", (int) defaultTimeout.getSeconds()); - } - this.defaultTimeout = defaultTimeout; - } - - /** - * Return the default timeout that this transaction manager should apply - * if there is no timeout specified at the transaction level, in seconds. - *

Returns {@code TransactionDefinition.TIMEOUT_DEFAULT} to indicate - * the underlying transaction infrastructure's default timeout. - */ - public final Duration getDefaultTimeout() { - return this.defaultTimeout; - } - - /** - * Set whether nested transactions are allowed. Default is "false". - *

Typically initialized with an appropriate default by the - * concrete transaction manager subclass. - */ - public final void setNestedTransactionAllowed(boolean nestedTransactionAllowed) { - this.nestedTransactionAllowed = nestedTransactionAllowed; - } - - /** - * Return whether nested transactions are allowed. - */ - public final boolean isNestedTransactionAllowed() { - return this.nestedTransactionAllowed; - } - - /** - * Set whether existing transactions should be validated before participating - * in them. - *

When participating in an existing transaction (e.g. with - * PROPAGATION_REQUIRED or PROPAGATION_SUPPORTS encountering an existing - * transaction), this outer transaction's characteristics will apply even - * to the inner transaction scope. Validation will detect incompatible - * isolation level and read-only settings on the inner transaction definition - * and reject participation accordingly through throwing a corresponding exception. - *

Default is "false", leniently ignoring inner transaction settings, - * simply overriding them with the outer transaction's characteristics. - * Switch this flag to "true" in order to enforce strict validation. - */ - public final void setValidateExistingTransaction(boolean validateExistingTransaction) { - this.validateExistingTransaction = validateExistingTransaction; - } - - /** - * Return whether existing transactions should be validated before participating - * in them. - */ - public final boolean isValidateExistingTransaction() { - return this.validateExistingTransaction; - } - - /** - * Set whether to globally mark an existing transaction as rollback-only - * after a participating transaction failed. - *

Default is "true": If a participating transaction (e.g. with - * PROPAGATION_REQUIRED or PROPAGATION_SUPPORTS encountering an existing - * transaction) fails, the transaction will be globally marked as rollback-only. - * The only possible outcome of such a transaction is a rollback: The - * transaction originator cannot make the transaction commit anymore. - *

Switch this to "false" to let the transaction originator make the rollback - * decision. If a participating transaction fails with an exception, the caller - * can still decide to continue with a different path within the transaction. - * However, note that this will only work as long as all participating resources - * are capable of continuing towards a transaction commit even after a data access - * failure: This is generally not the case for a Hibernate Session, for example; - * neither is it for a sequence of R2DBC insert/update/delete operations. - *

Note:This flag only applies to an explicit rollback attempt for a - * subtransaction, typically caused by an exception thrown by a data access operation - * (where TransactionInterceptor will trigger a {@code ReactiveTransactionManager.rollback()} - * call according to a rollback rule). If the flag is off, the caller can handle the exception - * and decide on a rollback, independent of the rollback rules of the subtransaction. - * This flag does, however, not apply to explicit {@code setRollbackOnly} - * calls on a {@code TransactionStatus}, which will always cause an eventual - * global rollback (as it might not throw an exception after the rollback-only call). - *

The recommended solution for handling failure of a subtransaction - * is a "nested transaction", where the global transaction can be rolled - * back to a savepoint taken at the beginning of the subtransaction. - * PROPAGATION_NESTED provides exactly those semantics; however, it will - * only work when nested transaction support is available. This is the case - * with DataSourceTransactionManager, but not with JtaTransactionManager. - * @see #setNestedTransactionAllowed - */ - public final void setGlobalRollbackOnParticipationFailure(boolean globalRollbackOnParticipationFailure) { - this.globalRollbackOnParticipationFailure = globalRollbackOnParticipationFailure; - } - - /** - * Return whether to globally mark an existing transaction as rollback-only - * after a participating transaction failed. - */ - public final boolean isGlobalRollbackOnParticipationFailure() { - return this.globalRollbackOnParticipationFailure; - } - - /** - * Set whether to fail early in case of the transaction being globally marked - * as rollback-only. - *

Default is "false", only causing an UnexpectedRollbackException at the - * outermost transaction boundary. Switch this flag on to cause an - * UnexpectedRollbackException as early as the global rollback-only marker - * has been first detected, even from within an inner transaction boundary. - * @see org.springframework.transaction.UnexpectedRollbackException - */ - public final void setFailEarlyOnGlobalRollbackOnly(boolean failEarlyOnGlobalRollbackOnly) { - this.failEarlyOnGlobalRollbackOnly = failEarlyOnGlobalRollbackOnly; - } - - /** - * Return whether to fail early in case of the transaction being globally marked - * as rollback-only. - */ - public final boolean isFailEarlyOnGlobalRollbackOnly() { - return this.failEarlyOnGlobalRollbackOnly; - } - - /** - * Set whether {@code doRollback} should be performed on failure of the - * {@code doCommit} call. Typically not necessary and thus to be avoided, - * as it can potentially override the commit exception with a subsequent - * rollback exception. - *

Default is "false". - * @see #doCommit - * @see #doRollback - */ - public final void setRollbackOnCommitFailure(boolean rollbackOnCommitFailure) { - this.rollbackOnCommitFailure = rollbackOnCommitFailure; - } - - /** - * Return whether {@code doRollback} should be performed on failure of the - * {@code doCommit} call. - */ - public final boolean isRollbackOnCommitFailure() { - return this.rollbackOnCommitFailure; - } //--------------------------------------------------------------------- // Implementation of ReactiveTransactionManager @@ -332,15 +95,7 @@ public final boolean isRollbackOnCommitFailure() { * @see #doBegin */ @Override - public final Mono getTransaction(@Nullable TransactionDefinition definition) throws TransactionException { - - if (definition == null) { - // Use defaults if no transaction definition given. - definition = new DefaultTransactionDefinition(); - } - - TransactionDefinition definitionToUse = definition; - + public final Mono getTransaction(TransactionDefinition definition) throws TransactionException { return ReactiveTransactionSynchronizationManager.currentTransaction() .flatMap(synchronizationManager -> { @@ -351,58 +106,52 @@ public final Mono getTransaction(@Nullable Transactio if (isExistingTransaction(transaction)) { // Existing transaction found -> check propagation behavior to find out how to behave. - return handleExistingTransaction(synchronizationManager, definitionToUse, transaction, debugEnabled); + return handleExistingTransaction(synchronizationManager, definition, transaction, debugEnabled); } // Check definition settings for new transaction. - if (definitionToUse.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { - return Mono.error(new InvalidTimeoutException("Invalid transaction timeout", definitionToUse.getTimeout())); + if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { + return Mono.error(new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout())); } // No existing transaction found -> check propagation behavior to find out how to proceed. - if (definitionToUse.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { + if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { return Mono.error(new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'")); - } else if (definitionToUse.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || - definitionToUse.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || - definitionToUse.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { + } + else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || + definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || + definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { return TransactionContextManager.currentContext() .map(ReactiveTransactionSynchronizationManager::new) - .flatMap(nestedSynchronizationManager -> { - - return suspend(nestedSynchronizationManager, null) - .map(Optional::of) - .defaultIfEmpty(Optional.empty()) - .flatMap(suspendedResources -> { - - if (debugEnabled) { - logger.debug("Creating new transaction with name [" + definitionToUse.getName() + "]: " + definitionToUse); - } - - return Mono.defer(() -> { - boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); - DefaultReactiveTransactionStatus status = newTransactionStatus( - nestedSynchronizationManager, definitionToUse, transaction, true, - newSynchronization, debugEnabled, suspendedResources.orElse(null)); - - return doBegin(nestedSynchronizationManager, transaction, definitionToUse) - .doOnSuccess(ignore -> prepareSynchronization(nestedSynchronizationManager, status, definitionToUse)) - .thenReturn(status); - }).onErrorResume(ErrorPredicates.RuntimeOrError, e -> { - return resume(nestedSynchronizationManager, null, suspendedResources.orElse(null)) - .then(Mono.error(e)); - }); - }); - }); - } else { + .flatMap(nestedSynchronizationManager -> + suspend(nestedSynchronizationManager, null) + .map(Optional::of) + .defaultIfEmpty(Optional.empty()) + .flatMap(suspendedResources -> { + if (debugEnabled) { + logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition); + } + return Mono.defer(() -> { + DefaultReactiveTransactionStatus status = newTransactionStatus( + nestedSynchronizationManager, definition, transaction, true, + debugEnabled, suspendedResources.orElse(null)); + return doBegin(nestedSynchronizationManager, transaction, definition) + .doOnSuccess(ignore -> prepareSynchronization(nestedSynchronizationManager, status, definition)) + .thenReturn(status); + }).onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, + ex -> resume(nestedSynchronizationManager, null, suspendedResources.orElse(null)) + .then(Mono.error(ex))); + })); + } + else { // Create "empty" transaction: no actual transaction, but potentially synchronization. - if (definitionToUse.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { + if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { logger.warn("Custom isolation level specified but no actual transaction initiated; " + - "isolation level will effectively be ignored: " + definitionToUse); + "isolation level will effectively be ignored: " + definition); } - boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); - return Mono.just(prepareTransactionStatus(synchronizationManager, definitionToUse, null, true, newSynchronization, debugEnabled, null)); + return Mono.just(prepareTransactionStatus(synchronizationManager, definition, null, true, debugEnabled, null)); } }); } @@ -411,8 +160,7 @@ public final Mono getTransaction(@Nullable Transactio * Create a TransactionStatus for an existing transaction. */ private Mono handleExistingTransaction(ReactiveTransactionSynchronizationManager synchronizationManager, - TransactionDefinition definition, Object transaction, boolean debugEnabled) - throws TransactionException { + TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException { if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { return Mono.error(new IllegalTransactionStateException( @@ -424,12 +172,10 @@ private Mono handleExistingTransaction(ReactiveTransa logger.debug("Suspending current transaction"); } Mono suspend = suspend(synchronizationManager, transaction); - boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); - return suspend.map(suspendedResources -> prepareTransactionStatus(synchronizationManager, - definition, null, false, newSynchronization, debugEnabled, suspendedResources)) // + definition, null, false, debugEnabled, suspendedResources)) // .switchIfEmpty(Mono.fromSupplier(() -> prepareTransactionStatus(synchronizationManager, - definition, null, false, newSynchronization, debugEnabled, null))) + definition, null, false, debugEnabled, null))) .cast(ReactiveTransactionStatus.class); } @@ -439,82 +185,46 @@ private Mono handleExistingTransaction(ReactiveTransa definition.getName() + "]"); } Mono suspendedResources = suspend(synchronizationManager, transaction); - boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); - return suspendedResources.flatMap(suspendedResourcesHolder -> { - DefaultReactiveTransactionStatus status = newTransactionStatus(synchronizationManager, - definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); - return doBegin(synchronizationManager, transaction, definition).doOnSuccess(ignore -> { - prepareSynchronization(synchronizationManager, status, definition); - }).thenReturn(status). - - onErrorResume(ErrorPredicates.RuntimeOrError, beginEx -> { - return resumeAfterBeginException(synchronizationManager, transaction, suspendedResourcesHolder, beginEx).then(Mono.error(beginEx)); - }); + definition, transaction, true, debugEnabled, suspendedResources); + return doBegin(synchronizationManager, transaction, definition).doOnSuccess(ignore -> + prepareSynchronization(synchronizationManager, status, definition)).thenReturn(status) + .onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, beginEx -> + resumeAfterBeginException(synchronizationManager, transaction, suspendedResourcesHolder, beginEx).then(Mono.error(beginEx))); }); } if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { - if (!isNestedTransactionAllowed()) { - return Mono.error(new NestedTransactionNotSupportedException( - "Transaction manager does not allow nested transactions by default - " + - "specify 'nestedTransactionAllowed' property with value 'true'")); - } if (debugEnabled) { logger.debug("Creating nested transaction with name [" + definition.getName() + "]"); } - // Nested transaction through nested begin and commit/rollback calls. - boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultReactiveTransactionStatus status = newTransactionStatus(synchronizationManager, - definition, transaction, true, newSynchronization, debugEnabled, null); - - return doBegin(synchronizationManager, transaction, definition).doOnSuccess(ignore -> { - prepareSynchronization(synchronizationManager, status, definition); - }).thenReturn(status); + definition, transaction, true, debugEnabled, null); + return doBegin(synchronizationManager, transaction, definition).doOnSuccess(ignore -> + prepareSynchronization(synchronizationManager, status, definition)).thenReturn(status); } // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED. if (debugEnabled) { logger.debug("Participating in existing transaction"); } - if (isValidateExistingTransaction()) { - if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { - Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); - if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) { - Constants isoConstants = new Constants(TransactionDefinition.class); - return Mono.error(new IllegalTransactionStateException("Participating transaction with definition [" + - definition + "] specifies isolation level which is incompatible with existing transaction: " + - (currentIsolationLevel != null ? - isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) : - "(unknown)"))); - } - } - if (!definition.isReadOnly()) { - if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) { - return Mono.error(new IllegalTransactionStateException("Participating transaction with definition [" + - definition + "] is not marked as read-only but existing transaction is")); - } - } - } - boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); - return Mono.just(prepareTransactionStatus(synchronizationManager, definition, transaction, false, newSynchronization, debugEnabled, null)); + return Mono.just(prepareTransactionStatus(synchronizationManager, definition, transaction, false, debugEnabled, null)); } /** * Create a new TransactionStatus for the given arguments, * also initializing transaction synchronization as appropriate. - * * @see #newTransactionStatus * @see #prepareTransactionStatus */ - protected final DefaultReactiveTransactionStatus prepareTransactionStatus( - ReactiveTransactionSynchronizationManager synchronizationManager, TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction, - boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) { + private DefaultReactiveTransactionStatus prepareTransactionStatus( + ReactiveTransactionSynchronizationManager synchronizationManager, TransactionDefinition definition, + @Nullable Object transaction, boolean newTransaction, boolean debug, @Nullable Object suspendedResources) { DefaultReactiveTransactionStatus status = newTransactionStatus(synchronizationManager, - definition, transaction, newTransaction, newSynchronization, debug, suspendedResources); + definition, transaction, newTransaction, debug, suspendedResources); prepareSynchronization(synchronizationManager, status, definition); return status; } @@ -522,21 +232,21 @@ protected final DefaultReactiveTransactionStatus prepareTransactionStatus( /** * Create a TransactionStatus instance for the given arguments. */ - protected DefaultReactiveTransactionStatus newTransactionStatus( - ReactiveTransactionSynchronizationManager synchronizationManager, TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction, - boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) { - - boolean actualNewSynchronization = newSynchronization && - !synchronizationManager.isSynchronizationActive(); - return new DefaultReactiveTransactionStatus( - transaction, newTransaction, actualNewSynchronization, + private DefaultReactiveTransactionStatus newTransactionStatus( + ReactiveTransactionSynchronizationManager synchronizationManager, TransactionDefinition definition, + @Nullable Object transaction, boolean newTransaction, boolean debug, @Nullable Object suspendedResources) { + + return new DefaultReactiveTransactionStatus(transaction, newTransaction, + !synchronizationManager.isSynchronizationActive(), definition.isReadOnly(), debug, suspendedResources); } /** * Initialize transaction synchronization as appropriate. */ - protected void prepareSynchronization(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status, TransactionDefinition definition) { + private void prepareSynchronization(ReactiveTransactionSynchronizationManager synchronizationManager, + DefaultReactiveTransactionStatus status, TransactionDefinition definition) { + if (status.isNewSynchronization()) { synchronizationManager.setActualTransactionActive(status.hasTransaction()); synchronizationManager.setCurrentTransactionIsolationLevel( @@ -548,22 +258,6 @@ protected void prepareSynchronization(ReactiveTransactionSynchronizationManager } } - /** - * Determine the actual timeout to use for the given definition. - * Will fall back to this manager's default timeout if the - * transaction definition doesn't specify a non-default value. - * @param definition the transaction definition - * @return the actual timeout to use - * @see org.springframework.transaction.TransactionDefinition#getTimeout() - * @see #setDefaultTimeout - */ - protected Duration determineTimeout(TransactionDefinition definition) { - if (definition.getTimeout() != TransactionDefinition.TIMEOUT_DEFAULT) { - return Duration.ofSeconds(definition.getTimeout()); - } - return this.defaultTimeout; - } - /** * Suspend the given transaction. Suspends transaction synchronization first, @@ -576,16 +270,14 @@ protected Duration determineTimeout(TransactionDefinition definition) { * @see #doSuspend * @see #resume */ - protected final Mono suspend(ReactiveTransactionSynchronizationManager synchronizationManager, @Nullable Object transaction) throws TransactionException { + private Mono suspend(ReactiveTransactionSynchronizationManager synchronizationManager, + @Nullable Object transaction) throws TransactionException { + if (synchronizationManager.isSynchronizationActive()) { Mono> suspendedSynchronizations = doSuspendSynchronization(synchronizationManager); - return suspendedSynchronizations.flatMap(synchronizations -> { - - Mono> suspendedResources = transaction != null ? doSuspend(synchronizationManager, transaction).map(Optional::of).defaultIfEmpty(Optional.empty()) : Mono.just(Optional.empty()); - + Mono> suspendedResources = (transaction != null ? doSuspend(synchronizationManager, transaction).map(Optional::of).defaultIfEmpty(Optional.empty()) : Mono.just(Optional.empty())); return suspendedResources.map(it -> { - String name = synchronizationManager.getCurrentTransactionName(); synchronizationManager.setCurrentTransactionName(null); boolean readOnly = synchronizationManager.isCurrentTransactionReadOnly(); @@ -596,13 +288,15 @@ protected final Mono suspend(ReactiveTransactionSynchr synchronizationManager.setActualTransactionActive(false); return new SuspendedResourcesHolder( it.orElse(null), synchronizations, name, readOnly, isolationLevel, wasActive); - }).onErrorResume(ErrorPredicates.RuntimeOrError, t -> doResumeSynchronization(synchronizationManager, synchronizations).cast(SuspendedResourcesHolder.class)); + }).onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, ex -> doResumeSynchronization(synchronizationManager, synchronizations).cast(SuspendedResourcesHolder.class)); }); - } else if (transaction != null) { + } + else if (transaction != null) { // Transaction active but no synchronization active. Mono> suspendedResources = doSuspend(synchronizationManager, transaction).map(Optional::of).defaultIfEmpty(Optional.empty()); return suspendedResources.map(it -> new SuspendedResourcesHolder(it.orElse(null))); - } else { + } + else { // Neither transaction nor synchronization active. return Mono.empty(); } @@ -619,7 +313,8 @@ protected final Mono suspend(ReactiveTransactionSynchr * @see #doResume * @see #suspend */ - protected final Mono resume(ReactiveTransactionSynchronizationManager synchronizationManager, @Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder) + private Mono resume(ReactiveTransactionSynchronizationManager synchronizationManager, + @Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder) throws TransactionException { if (resourcesHolder != null) { @@ -644,23 +339,23 @@ protected final Mono resume(ReactiveTransactionSynchronizationManager sync * Resume outer transaction after inner transaction begin failed. */ private Mono resumeAfterBeginException(ReactiveTransactionSynchronizationManager synchronizationManager, - Object transaction, @Nullable SuspendedResourcesHolder suspendedResources, Throwable beginEx) { + Object transaction, @Nullable SuspendedResourcesHolder suspendedResources, Throwable beginEx) { String exMessage = "Inner transaction begin exception overridden by outer transaction resume exception"; - return resume(synchronizationManager, transaction, suspendedResources).doOnError(ErrorPredicates.RuntimeOrError, t -> logger.error(exMessage, beginEx)); + return resume(synchronizationManager, transaction, suspendedResources).doOnError(ErrorPredicates.RUNTIME_OR_ERROR, + ex -> logger.error(exMessage, beginEx)); } /** * Suspend all current synchronizations and deactivate transaction * synchronization for the current transaction context. - * * @param synchronizationManager the synchronization manager bound to the current transaction * @return the List of suspended ReactiveTransactionSynchronization objects */ - private Mono> doSuspendSynchronization(ReactiveTransactionSynchronizationManager synchronizationManager) { - List suspendedSynchronizations = - synchronizationManager.getSynchronizations(); + private Mono> doSuspendSynchronization( + ReactiveTransactionSynchronizationManager synchronizationManager) { + List suspendedSynchronizations = synchronizationManager.getSynchronizations(); return Flux.fromIterable(suspendedSynchronizations) .concatMap(ReactiveTransactionSynchronization::suspend) .then(Mono.defer(() -> { @@ -675,14 +370,13 @@ private Mono> doSuspendSynchronization( * @param synchronizationManager the synchronization manager bound to the current transaction * @param suspendedSynchronizations a List of ReactiveTransactionSynchronization objects */ - private Mono doResumeSynchronization(ReactiveTransactionSynchronizationManager synchronizationManager, List suspendedSynchronizations) { - synchronizationManager.initSynchronization(); + private Mono doResumeSynchronization(ReactiveTransactionSynchronizationManager synchronizationManager, + List suspendedSynchronizations) { + synchronizationManager.initSynchronization(); return Flux.fromIterable(suspendedSynchronizations) - .concatMap(synchronization -> { - return synchronization.resume() - .doOnSuccess(ignore -> synchronizationManager.registerSynchronization(synchronization)); - }).then(); + .concatMap(synchronization -> synchronization.resume() + .doOnSuccess(ignore -> synchronizationManager.registerSynchronization(synchronization))).then(); } @@ -703,22 +397,13 @@ public final Mono commit(ReactiveTransactionStatus status) throws Transact } return ReactiveTransactionSynchronizationManager.currentTransaction().flatMap(synchronizationManager -> { - DefaultReactiveTransactionStatus defStatus = (DefaultReactiveTransactionStatus) status; - if (defStatus.isLocalRollbackOnly()) { + if (defStatus.isRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Transactional code has requested rollback"); } - return processRollback(synchronizationManager, defStatus, false); - } - - if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) { - if (defStatus.isDebug()) { - logger.debug("Global transaction is marked as rollback-only but transactional code requested commit"); - } - return processRollback(synchronizationManager, defStatus, true); + return processRollback(synchronizationManager, defStatus); } - return processCommit(synchronizationManager, defStatus); }); } @@ -730,90 +415,55 @@ public final Mono commit(ReactiveTransactionStatus status) throws Transact * @param status object representing the transaction * @throws TransactionException in case of commit failure */ - private Mono processCommit(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status) throws TransactionException { + private Mono processCommit(ReactiveTransactionSynchronizationManager synchronizationManager, + DefaultReactiveTransactionStatus status) throws TransactionException { AtomicBoolean beforeCompletionInvoked = new AtomicBoolean(false); - AtomicBoolean unexpectedRollback = new AtomicBoolean(false); Mono commit = prepareForCommit(synchronizationManager, status) .then(triggerBeforeCommit(synchronizationManager, status)) .then(triggerBeforeCompletion(synchronizationManager, status)) .then(Mono.defer(() -> { - beforeCompletionInvoked.set(true); - if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction commit"); } - unexpectedRollback.set(status.isGlobalRollbackOnly()); return doCommit(synchronizationManager, status); - } else if (isFailEarlyOnGlobalRollbackOnly()) { - unexpectedRollback.set(status.isGlobalRollbackOnly()); - } - - return Mono.empty(); - })).then(Mono.defer(() -> { - - // Throw UnexpectedRollbackException if we have a global rollback-only - // marker but still didn't get a corresponding exception from commit. - if (unexpectedRollback.get()) { - return Mono.error(new UnexpectedRollbackException( - "Transaction silently rolled back because it has been marked as rollback-only")); } - return Mono.empty(); - }).onErrorResume(e -> { - - Mono propagateException = Mono.error(e); - if (ErrorPredicates.UnexpectedRollback.test(e)) { + })).then(Mono.empty().onErrorResume(ex -> { + Mono propagateException = Mono.error(ex); + if (ErrorPredicates.UNEXPECTED_ROLLBACK.test(ex)) { return triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_ROLLED_BACK).then(propagateException); } - - if (ErrorPredicates.TransactionException.test(e)) { - - Mono mono; - // can only be caused by doCommit - if (isRollbackOnCommitFailure()) { - mono = doRollbackOnCommitException(synchronizationManager, status, e); - } else { - mono = triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_UNKNOWN); - } - return mono.then(propagateException); + if (ErrorPredicates.TRANSACTION_EXCEPTION.test(ex)) { + triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_UNKNOWN).then(propagateException); } - - if (ErrorPredicates.RuntimeOrError.test(e)) { - + if (ErrorPredicates.RUNTIME_OR_ERROR.test(ex)) { Mono mono; if (!beforeCompletionInvoked.get()) { mono = triggerBeforeCompletion(synchronizationManager, status); - } else { + } + else { mono = Mono.empty(); } - return mono.then(doRollbackOnCommitException(synchronizationManager, status, e)).then(propagateException); + return mono.then(doRollbackOnCommitException(synchronizationManager, status, ex)).then(propagateException); } return propagateException; - })).then(Mono.defer((() -> { - - // Trigger afterCommit callbacks, with an exception thrown there - // propagated to callers but the transaction still considered as committed. - - return triggerAfterCommit(synchronizationManager, status).onErrorResume(e -> { - return triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_COMMITTED).then(Mono.error(e)); - }).then(triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_COMMITTED)); - }))); + })).then(Mono.defer(() -> triggerAfterCommit(synchronizationManager, status).onErrorResume(ex -> + triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_COMMITTED).then(Mono.error(ex))) + .then(triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_COMMITTED)))); return commit - .onErrorResume((e) -> { - return cleanupAfterCompletion(synchronizationManager, status).then(Mono.error(e)); - }).then(cleanupAfterCompletion(synchronizationManager, status)); + .onErrorResume(ex -> cleanupAfterCompletion(synchronizationManager, status) + .then(Mono.error(ex))).then(cleanupAfterCompletion(synchronizationManager, status)); } /** - * This implementation of rollback handles participating in existing - * transactions. Delegates to {@code doRollback} and - * {@code doSetRollbackOnly}. + * This implementation of rollback handles participating in existing transactions. + * Delegates to {@code doRollback} and {@code doSetRollbackOnly}. * @see #doRollback * @see #doSetRollbackOnly */ @@ -823,10 +473,9 @@ public final Mono rollback(ReactiveTransactionStatus status) throws Transa return Mono.error(new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction")); } - return ReactiveTransactionSynchronizationManager.currentTransaction().flatMap(synchronizationManager -> { DefaultReactiveTransactionStatus defStatus = (DefaultReactiveTransactionStatus) status; - return processRollback(synchronizationManager, defStatus, false); + return processRollback(synchronizationManager, defStatus); }); } @@ -837,90 +486,64 @@ public final Mono rollback(ReactiveTransactionStatus status) throws Transa * @param status object representing the transaction * @throws TransactionException in case of rollback failure */ - private Mono processRollback(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status, boolean unexpected) { - AtomicBoolean unexpectedRollback = new AtomicBoolean(unexpected); - - return triggerBeforeCompletion(synchronizationManager, status) - .then(Mono.defer(() -> { + private Mono processRollback(ReactiveTransactionSynchronizationManager synchronizationManager, + DefaultReactiveTransactionStatus status) { + return triggerBeforeCompletion(synchronizationManager, status).then(Mono.defer(() -> { if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction rollback"); } return doRollback(synchronizationManager, status); - } else { - + } + else { Mono beforeCompletion = Mono.empty(); // Participating in larger transaction if (status.hasTransaction()) { - if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) { - if (status.isDebug()) { - logger.debug("Participating transaction failed - marking existing transaction as rollback-only"); - } - beforeCompletion = doSetRollbackOnly(synchronizationManager, status); - } else { - if (status.isDebug()) { - logger.debug("Participating transaction failed - letting transaction originator decide on rollback"); - } + if (status.isDebug()) { + logger.debug("Participating transaction failed - marking existing transaction as rollback-only"); } - } else { + beforeCompletion = doSetRollbackOnly(synchronizationManager, status); + } + else { logger.debug("Should roll back transaction but cannot - no transaction available"); } - - return beforeCompletion.doOnSuccess(ignore -> { - - // Unexpected rollback only matters here if we're asked to fail early - if (!isFailEarlyOnGlobalRollbackOnly()) { - unexpectedRollback.set(false); - } - }); + return beforeCompletion; } - })).onErrorResume(ErrorPredicates.RuntimeOrError, e -> triggerAfterCompletion(synchronizationManager, - status, ReactiveTransactionSynchronization.STATUS_UNKNOWN) - .then(Mono.error(e))) - .then(Mono.defer(() -> { - - Mono afterCompletion = triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_ROLLED_BACK); - - // Raise UnexpectedRollbackException if we had a global rollback-only marker - if (unexpectedRollback.get()) { - return afterCompletion.then(Mono.error(new UnexpectedRollbackException( - "Transaction rolled back because it has been marked as rollback-only"))); - } - return afterCompletion; - })).onErrorResume((e) -> cleanupAfterCompletion(synchronizationManager, status) - .then(Mono.error(e))) + })).onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, ex -> triggerAfterCompletion( + synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_UNKNOWN) + .then(Mono.error(ex))) + .then(Mono.defer(() -> triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_ROLLED_BACK))) + .onErrorResume(ex -> cleanupAfterCompletion(synchronizationManager, status).then(Mono.error(ex))) .then(cleanupAfterCompletion(synchronizationManager, status)); } /** * Invoke {@code doRollback}, handling rollback exceptions properly. - * * @param synchronizationManager the synchronization manager bound to the current transaction * @param status object representing the transaction * @param ex the thrown application exception or error * @throws TransactionException in case of rollback failure * @see #doRollback */ - private Mono doRollbackOnCommitException(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status, Throwable ex) throws TransactionException { + private Mono doRollbackOnCommitException(ReactiveTransactionSynchronizationManager synchronizationManager, + DefaultReactiveTransactionStatus status, Throwable ex) throws TransactionException { return Mono.defer(() -> { - if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction rollback after commit exception", ex); } return doRollback(synchronizationManager, status); - } else if (status.hasTransaction() && isGlobalRollbackOnParticipationFailure()) { + } + else if (status.hasTransaction()) { if (status.isDebug()) { logger.debug("Marking existing transaction as rollback-only after commit exception", ex); } return doSetRollbackOnly(synchronizationManager, status); } - return Mono.empty(); - }).onErrorResume(ErrorPredicates.RuntimeOrError, (rbex) -> { - + }).onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, rbex -> { logger.error("Commit exception overridden by rollback exception", ex); return triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_UNKNOWN) .then(Mono.error(rbex)); @@ -933,7 +556,9 @@ private Mono doRollbackOnCommitException(ReactiveTransactionSynchronizatio * @param synchronizationManager the synchronization manager bound to the current transaction * @param status object representing the transaction */ - protected final Mono triggerBeforeCommit(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status) { + private Mono triggerBeforeCommit(ReactiveTransactionSynchronizationManager synchronizationManager, + DefaultReactiveTransactionStatus status) { + if (status.isNewSynchronization()) { if (status.isDebug()) { logger.trace("Triggering beforeCommit synchronization"); @@ -949,7 +574,9 @@ protected final Mono triggerBeforeCommit(ReactiveTransactionSynchronizatio * @param synchronizationManager the synchronization manager bound to the current transaction * @param status object representing the transaction */ - protected final Mono triggerBeforeCompletion(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status) { + private Mono triggerBeforeCompletion(ReactiveTransactionSynchronizationManager synchronizationManager, + DefaultReactiveTransactionStatus status) { + if (status.isNewSynchronization()) { if (status.isDebug()) { logger.trace("Triggering beforeCompletion synchronization"); @@ -965,7 +592,9 @@ protected final Mono triggerBeforeCompletion(ReactiveTransactionSynchroniz * @param synchronizationManager the synchronization manager bound to the current transaction * @param status object representing the transaction */ - private Mono triggerAfterCommit(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status) { + private Mono triggerAfterCommit(ReactiveTransactionSynchronizationManager synchronizationManager, + DefaultReactiveTransactionStatus status) { + if (status.isNewSynchronization()) { if (status.isDebug()) { logger.trace("Triggering afterCommit synchronization"); @@ -982,7 +611,9 @@ private Mono triggerAfterCommit(ReactiveTransactionSynchronizationManager * @param status object representing the transaction * @param completionStatus completion status according to ReactiveTransactionSynchronization constants */ - private Mono triggerAfterCompletion(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status, int completionStatus) { + private Mono triggerAfterCompletion(ReactiveTransactionSynchronizationManager synchronizationManager, + DefaultReactiveTransactionStatus status, int completionStatus) { + if (status.isNewSynchronization()) { List synchronizations = synchronizationManager.getSynchronizations(); synchronizationManager.clearSynchronization(); @@ -993,7 +624,8 @@ private Mono triggerAfterCompletion(ReactiveTransactionSynchronizationMana // No transaction or new transaction for the current scope -> // invoke the afterCompletion callbacks immediately return invokeAfterCompletion(synchronizationManager, synchronizations, completionStatus); - } else if (!synchronizations.isEmpty()) { + } + else if (!synchronizations.isEmpty()) { // Existing transaction that we participate in, controlled outside // of the scope of this Spring transaction manager -> try to register // an afterCompletion callback with the existing (JTA) transaction. @@ -1018,7 +650,9 @@ private Mono triggerAfterCompletion(ReactiveTransactionSynchronizationMana * @see ReactiveTransactionSynchronization#STATUS_ROLLED_BACK * @see ReactiveTransactionSynchronization#STATUS_UNKNOWN */ - protected final Mono invokeAfterCompletion(ReactiveTransactionSynchronizationManager synchronizationManager, List synchronizations, int completionStatus) { + private Mono invokeAfterCompletion(ReactiveTransactionSynchronizationManager synchronizationManager, + List synchronizations, int completionStatus) { + return ReactiveTransactionSynchronizationUtils.invokeAfterCompletion(synchronizations, completionStatus); } @@ -1029,10 +663,10 @@ protected final Mono invokeAfterCompletion(ReactiveTransactionSynchronizat * @param status object representing the transaction * @see #doCleanupAfterCompletion */ - private Mono cleanupAfterCompletion(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status) { + private Mono cleanupAfterCompletion(ReactiveTransactionSynchronizationManager synchronizationManager, + DefaultReactiveTransactionStatus status) { return Mono.defer(() -> { - status.setCompleted(); if (status.isNewSynchronization()) { synchronizationManager.clear(); @@ -1047,11 +681,11 @@ private Mono cleanupAfterCompletion(ReactiveTransactionSynchronizationMana Object transaction = (status.hasTransaction() ? status.getTransaction() : null); return resume(synchronizationManager, transaction, (SuspendedResourcesHolder) status.getSuspendedResources()); } - return Mono.empty(); }); } + //--------------------------------------------------------------------- // Template methods to be implemented in subclasses //--------------------------------------------------------------------- @@ -1106,19 +740,20 @@ protected boolean isExistingTransaction(Object transaction) throws TransactionEx *

This method gets called when the transaction manager has decided to actually * start a new transaction. Either there wasn't any transaction before, or the * previous transaction has been suspended. - *

A special scenario is a nested transaction without savepoint: If - * {@code useSavepointForNestedTransaction()} returns "false", this method - * will be called to start a nested transaction when necessary. In such a context, - * there will be an active transaction: The implementation of this method has - * to detect this and start an appropriate nested transaction. + *

A special scenario is a nested transaction: This method will be called to + * start a nested transaction when necessary. In such a context, there will be an + * active transaction: The implementation of this method has to detect this and + * start an appropriate nested transaction. * @param synchronizationManager the synchronization manager bound to the new transaction * @param transaction transaction object returned by {@code doGetTransaction} * @param definition a TransactionDefinition instance, describing propagation * behavior, isolation level, read-only flag, timeout, and transaction name * @throws TransactionException in case of creation or system errors + * @throws org.springframework.transaction.NestedTransactionNotSupportedException + * if the underlying transaction does not support nesting (e.g. through savepoints) */ - protected abstract Mono doBegin(ReactiveTransactionSynchronizationManager synchronizationManager, Object transaction, TransactionDefinition definition) - throws TransactionException; + protected abstract Mono doBegin(ReactiveTransactionSynchronizationManager synchronizationManager, + Object transaction, TransactionDefinition definition) throws TransactionException; /** * Suspend the resources of the current transaction. @@ -1133,7 +768,9 @@ protected abstract Mono doBegin(ReactiveTransactionSynchronizationManager * @throws TransactionException in case of system errors * @see #doResume */ - protected Mono doSuspend(ReactiveTransactionSynchronizationManager synchronizationManager, Object transaction) throws TransactionException { + protected Mono doSuspend(ReactiveTransactionSynchronizationManager synchronizationManager, + Object transaction) throws TransactionException { + throw new TransactionSuspensionNotSupportedException( "Transaction manager [" + getClass().getName() + "] does not support transaction suspension"); } @@ -1151,40 +788,13 @@ protected Mono doSuspend(ReactiveTransactionSynchronizationManager synch * @throws TransactionException in case of system errors * @see #doSuspend */ - protected Mono doResume(ReactiveTransactionSynchronizationManager synchronizationManager, @Nullable Object transaction, Object suspendedResources) throws TransactionException { + protected Mono doResume(ReactiveTransactionSynchronizationManager synchronizationManager, + @Nullable Object transaction, Object suspendedResources) throws TransactionException { + throw new TransactionSuspensionNotSupportedException( "Transaction manager [" + getClass().getName() + "] does not support transaction suspension"); } - /** - * Return whether to call {@code doCommit} on a transaction that has been - * marked as rollback-only in a global fashion. - *

Does not apply if an application locally sets the transaction to rollback-only - * via the TransactionStatus, but only to the transaction itself being marked as - * rollback-only by the transaction coordinator. - *

Default is "false": Local transaction strategies usually don't hold the rollback-only - * marker in the transaction itself, therefore they can't handle rollback-only transactions - * as part of transaction commit. Hence, AbstractReactiveTransactionManager will trigger - * a rollback in that case, throwing an UnexpectedRollbackException afterwards. - *

Override this to return "true" if the concrete transaction manager expects a - * {@code doCommit} call even for a rollback-only transaction, allowing for - * special handling there. This will, for example, be the case for JTA, where - * {@code UserTransaction.commit} will check the read-only flag itself and - * throw a corresponding RollbackException, which might include the specific reason - * (such as a transaction timeout). - *

If this method returns "true" but the {@code doCommit} implementation does not - * throw an exception, this transaction manager will throw an UnexpectedRollbackException - * itself. - * @see #doCommit - * @see DefaultReactiveTransactionStatus#isGlobalRollbackOnly() - * @see DefaultReactiveTransactionStatus#isLocalRollbackOnly() - * @see ReactiveTransactionStatus#setRollbackOnly() - * @see org.springframework.transaction.UnexpectedRollbackException - */ - protected boolean shouldCommitOnGlobalRollbackOnly() { - return false; - } - /** * Make preparations for commit, to be performed before the * {@code beforeCommit} synchronization callbacks occur. @@ -1195,7 +805,9 @@ protected boolean shouldCommitOnGlobalRollbackOnly() { * @throws RuntimeException in case of errors; will be propagated to the caller * (note: do not throw TransactionException subclasses here!) */ - protected Mono prepareForCommit(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status) { + protected Mono prepareForCommit(ReactiveTransactionSynchronizationManager synchronizationManager, + DefaultReactiveTransactionStatus status) { + return Mono.empty(); } @@ -1210,7 +822,8 @@ protected Mono prepareForCommit(ReactiveTransactionSynchronizationManager * @throws TransactionException in case of commit or system errors * @see DefaultReactiveTransactionStatus#getTransaction */ - protected abstract Mono doCommit(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status) throws TransactionException; + protected abstract Mono doCommit(ReactiveTransactionSynchronizationManager synchronizationManager, + DefaultReactiveTransactionStatus status) throws TransactionException; /** * Perform an actual rollback of the given transaction. @@ -1222,7 +835,8 @@ protected Mono prepareForCommit(ReactiveTransactionSynchronizationManager * @throws TransactionException in case of system errors * @see DefaultReactiveTransactionStatus#getTransaction */ - protected abstract Mono doRollback(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status) throws TransactionException; + protected abstract Mono doRollback(ReactiveTransactionSynchronizationManager synchronizationManager, + DefaultReactiveTransactionStatus status) throws TransactionException; /** * Set the given transaction rollback-only. Only called on rollback @@ -1234,10 +848,12 @@ protected Mono prepareForCommit(ReactiveTransactionSynchronizationManager * @param status the status representation of the transaction * @throws TransactionException in case of system errors */ - protected Mono doSetRollbackOnly(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status) throws TransactionException { + protected Mono doSetRollbackOnly(ReactiveTransactionSynchronizationManager synchronizationManager, + DefaultReactiveTransactionStatus status) throws TransactionException { + throw new IllegalTransactionStateException( "Participating in existing transactions is not supported - when 'isExistingTransaction' " + - "returns true, appropriate 'doSetRollbackOnly' behavior must be provided"); + "returns true, appropriate 'doSetRollbackOnly' behavior must be provided"); } /** @@ -1257,7 +873,7 @@ protected Mono doSetRollbackOnly(ReactiveTransactionSynchronizationManager * @see ReactiveTransactionSynchronization#STATUS_UNKNOWN */ protected Mono registerAfterCompletionWithExistingTransaction(ReactiveTransactionSynchronizationManager synchronizationManager, - Object transaction, List synchronizations) throws TransactionException { + Object transaction, List synchronizations) throws TransactionException { logger.debug("Cannot register Spring after-completion synchronization with existing transaction - " + "processing Spring after-completion callbacks immediately, with outcome status 'unknown'"); @@ -1272,10 +888,13 @@ protected Mono registerAfterCompletionWithExistingTransaction(ReactiveTran * @param synchronizationManager the synchronization manager bound to the current transaction * @param transaction transaction object returned by {@code doGetTransaction} */ - protected Mono doCleanupAfterCompletion(ReactiveTransactionSynchronizationManager synchronizationManager, Object transaction) { + protected Mono doCleanupAfterCompletion(ReactiveTransactionSynchronizationManager synchronizationManager, + Object transaction) { + return Mono.empty(); } + //--------------------------------------------------------------------- // Serialization support //--------------------------------------------------------------------- @@ -1311,7 +930,7 @@ protected static final class SuspendedResourcesHolder { private boolean wasActive; - private SuspendedResourcesHolder(Object suspendedResources) { + private SuspendedResourcesHolder(@Nullable Object suspendedResources) { this.suspendedResources = suspendedResources; } @@ -1328,15 +947,16 @@ private SuspendedResourcesHolder( } } + /** * Predicates for exception types that transactional error handling applies to. */ - enum ErrorPredicates implements Predicate { + private enum ErrorPredicates implements Predicate { /** * Predicate matching {@link RuntimeException} or {@link Error}. */ - RuntimeOrError { + RUNTIME_OR_ERROR { @Override public boolean test(Throwable throwable) { return throwable instanceof RuntimeException || throwable instanceof Error; @@ -1346,7 +966,7 @@ public boolean test(Throwable throwable) { /** * Predicate matching {@link TransactionException}. */ - TransactionException { + TRANSACTION_EXCEPTION { @Override public boolean test(Throwable throwable) { return throwable instanceof TransactionException; @@ -1356,7 +976,7 @@ public boolean test(Throwable throwable) { /** * Predicate matching {@link UnexpectedRollbackException}. */ - UnexpectedRollback { + UNEXPECTED_ROLLBACK { @Override public boolean test(Throwable throwable) { return throwable instanceof UnexpectedRollbackException; diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionStatus.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionStatus.java index f3466906cb8d..6b7552004c17 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionStatus.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionStatus.java @@ -16,8 +16,6 @@ package org.springframework.transaction.reactive; -import reactor.core.publisher.Mono; - import org.springframework.transaction.ReactiveTransactionStatus; /** @@ -29,6 +27,7 @@ * underlying transaction object, and no transaction synchronization mechanism. * * @author Mark Paluch + * @author Juergen Hoeller * @since 5.2 * @see #setRollbackOnly() * @see #isRollbackOnly() @@ -52,42 +51,14 @@ public void setRollbackOnly() { this.rollbackOnly = true; } - /** - * Determine the rollback-only flag via checking both the local rollback-only flag - * of this TransactionStatus and the global rollback-only flag of the underlying - * transaction, if any. - * @see #isLocalRollbackOnly() - * @see #isGlobalRollbackOnly() - */ - @Override - public boolean isRollbackOnly() { - return (isLocalRollbackOnly() || isGlobalRollbackOnly()); - } - /** * Determine the rollback-only flag via checking this ReactiveTransactionStatus. *

Will only return "true" if the application called {@code setRollbackOnly} * on this TransactionStatus object. */ - public boolean isLocalRollbackOnly() { - return this.rollbackOnly; - } - - /** - * Template method for determining the global rollback-only flag of the - * underlying transaction, if any. - *

This implementation always returns {@code false}. - */ - public boolean isGlobalRollbackOnly() { - return false; - } - - /** - * This implementations is empty, considering flush as a no-op. - */ @Override - public Mono flush() { - return Mono.empty(); + public boolean isRollbackOnly() { + return this.rollbackOnly; } /** diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/DefaultReactiveTransactionStatus.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/DefaultReactiveTransactionStatus.java index 978b33d04622..14dd58782ee8 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/DefaultReactiveTransactionStatus.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/DefaultReactiveTransactionStatus.java @@ -21,8 +21,8 @@ import org.springframework.util.Assert; /** - * Default implementation of the {@link ReactiveTransactionStatus} - * interface, used by {@link AbstractReactiveTransactionManager}. Based on the concept + * Default implementation of the {@link ReactiveTransactionStatus} interface, + * used by {@link AbstractReactiveTransactionManager}. Based on the concept * of an underlying "transaction object". * *

Holds all status information that {@link AbstractReactiveTransactionManager} @@ -33,6 +33,7 @@ * implementations, in particular not for mock transaction managers in testing environments. * * @author Mark Paluch + * @author Juergen Hoeller * @since 5.2 * @see AbstractReactiveTransactionManager * @see #getTransaction @@ -72,6 +73,7 @@ public class DefaultReactiveTransactionStatus extends AbstractReactiveTransactio public DefaultReactiveTransactionStatus( @Nullable Object transaction, boolean newTransaction, boolean newSynchronization, boolean readOnly, boolean debug, @Nullable Object suspendedResources) { + this.transaction = transaction; this.newTransaction = newTransaction; this.newSynchronization = newSynchronization; diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/DefaultTransactionalOperator.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/DefaultTransactionalOperator.java index 77ce6ee90c59..c1285fdf94a9 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/DefaultTransactionalOperator.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/DefaultTransactionalOperator.java @@ -21,12 +21,11 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import org.springframework.transaction.ReactiveTransactionManager; +import org.springframework.transaction.ReactiveTransactionStatus; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionException; import org.springframework.transaction.TransactionSystemException; -import org.springframework.transaction.ReactiveTransactionManager; -import org.springframework.transaction.ReactiveTransactionStatus; -import org.springframework.transaction.support.DefaultTransactionDefinition; import org.springframework.util.Assert; /** @@ -34,26 +33,20 @@ * transaction exception handling. * * @author Mark Paluch + * @author Juergen Hoeller * @since 5.2 * @see #execute * @see ReactiveTransactionManager */ @SuppressWarnings("serial") -class DefaultTransactionalOperator extends DefaultTransactionDefinition - implements TransactionalOperator { +final class DefaultTransactionalOperator implements TransactionalOperator { - private final Log logger = LogFactory.getLog(getClass()); + private static final Log logger = LogFactory.getLog(DefaultTransactionalOperator.class); private final ReactiveTransactionManager transactionManager; - /** - * Construct a new DefaultTransactionalOperator using the given transaction manager. - * @param transactionManager the transaction management strategy to be used - */ - DefaultTransactionalOperator(ReactiveTransactionManager transactionManager) { - Assert.notNull(transactionManager, "ReactiveTransactionManager must not be null"); - this.transactionManager = transactionManager; - } + private final TransactionDefinition transactionDefinition; + /** * Construct a new TransactionTemplate using the given transaction manager, @@ -63,9 +56,10 @@ class DefaultTransactionalOperator extends DefaultTransactionDefinition * default settings from. Local properties can still be set to change values. */ DefaultTransactionalOperator(ReactiveTransactionManager transactionManager, TransactionDefinition transactionDefinition) { - super(transactionDefinition); Assert.notNull(transactionManager, "ReactiveTransactionManager must not be null"); + Assert.notNull(transactionManager, "TransactionDefinition must not be null"); this.transactionManager = transactionManager; + this.transactionDefinition = transactionDefinition; } @@ -76,29 +70,21 @@ public ReactiveTransactionManager getTransactionManager() { return this.transactionManager; } + @Override public Flux execute(ReactiveTransactionCallback action) throws TransactionException { - return TransactionContextManager.currentContext().flatMapMany(context -> { - - Mono status = this.transactionManager.getTransaction(this); - + Mono status = this.transactionManager.getTransaction(this.transactionDefinition); return status.flatMapMany(it -> { - // This is an around advice: Invoke the next interceptor in the chain. // This will normally result in a target object being invoked. Flux retVal = Flux.from(action.doInTransaction(it)); - - return retVal.onErrorResume(ex -> { - // Transactional code threw application exception -> rollback - return rollbackOnException(it, ex).then(Mono.error(ex)); - }).materialize().flatMap(signal -> { - - if (signal.isOnComplete()) { - return transactionManager.commit(it).materialize(); - } - - return Mono.just(signal); + return retVal.onErrorResume(ex -> rollbackOnException(it, ex). + then(Mono.error(ex))).materialize().flatMap(signal -> { + if (signal.isOnComplete()) { + return this.transactionManager.commit(it).materialize(); + } + return Mono.just(signal); }).dematerialize(); }); }) @@ -113,13 +99,9 @@ public Flux execute(ReactiveTransactionCallback action) throws Transac * @throws TransactionException in case of a rollback error */ private Mono rollbackOnException(ReactiveTransactionStatus status, Throwable ex) throws TransactionException { - logger.debug("Initiating transaction rollback on application exception", ex); - return this.transactionManager.rollback(status).onErrorMap(ex2 -> { - logger.error("Application exception overridden by rollback exception", ex); - if (ex2 instanceof TransactionSystemException) { ((TransactionSystemException) ex2).initApplicationException(ex); } @@ -128,10 +110,16 @@ private Mono rollbackOnException(ReactiveTransactionStatus status, Throwab ); } + @Override public boolean equals(Object other) { return (this == other || (super.equals(other) && (!(other instanceof DefaultTransactionalOperator) || getTransactionManager() == ((DefaultTransactionalOperator) other).getTransactionManager()))); } + @Override + public int hashCode() { + return getTransactionManager().hashCode(); + } + } diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveResourceHolderSynchronization.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveResourceHolderSynchronization.java index 1f24ef436401..188587626a3d 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveResourceHolderSynchronization.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveResourceHolderSynchronization.java @@ -16,11 +16,10 @@ package org.springframework.transaction.reactive; -import org.springframework.transaction.support.ResourceHolder; -import org.springframework.transaction.support.TransactionSynchronizationManager; - import reactor.core.publisher.Mono; +import org.springframework.transaction.support.ResourceHolder; + /** * {@link ReactiveTransactionSynchronization} implementation that manages a * {@link ResourceHolder} bound through {@link ReactiveTransactionSynchronizationManager}. @@ -47,9 +46,11 @@ public abstract class ReactiveResourceHolderSynchronization suspend() { if (this.holderActive) { - synchronizationManager.unbindResource(this.resourceKey); + this.synchronizationManager.unbindResource(this.resourceKey); } return Mono.empty(); } @@ -67,16 +68,11 @@ public Mono suspend() { @Override public Mono resume() { if (this.holderActive) { - synchronizationManager.bindResource(this.resourceKey, this.resourceHolder); + this.synchronizationManager.bindResource(this.resourceKey, this.resourceHolder); } return Mono.empty(); } - @Override - public Mono flush() { - return flushResource(this.resourceHolder); - } - @Override public Mono beforeCommit(boolean readOnly) { return Mono.empty(); @@ -85,13 +81,12 @@ public Mono beforeCommit(boolean readOnly) { @Override public Mono beforeCompletion() { if (shouldUnbindAtCompletion()) { - synchronizationManager.unbindResource(this.resourceKey); + this.synchronizationManager.unbindResource(this.resourceKey); this.holderActive = false; if (shouldReleaseBeforeCompletion()) { return releaseResource(this.resourceHolder, this.resourceKey); } } - return Mono.empty(); } @@ -100,15 +95,12 @@ public Mono afterCommit() { if (!shouldReleaseBeforeCompletion()) { return processResourceAfterCommit(this.resourceHolder); } - return Mono.empty(); } @Override public Mono afterCompletion(int status) { - return Mono.defer(() -> { - Mono sync = Mono.empty(); if (shouldUnbindAtCompletion()) { boolean releaseNecessary = false; @@ -116,20 +108,21 @@ public Mono afterCompletion(int status) { // The thread-bound resource holder might not be available anymore, // since afterCompletion might get called from a different thread. this.holderActive = false; - synchronizationManager.unbindResourceIfPossible(this.resourceKey); + this.synchronizationManager.unbindResourceIfPossible(this.resourceKey); this.resourceHolder.unbound(); releaseNecessary = true; - } else { + } + else { releaseNecessary = shouldReleaseAfterCompletion(this.resourceHolder); } if (releaseNecessary) { sync = releaseResource(this.resourceHolder, this.resourceKey); } - } else { + } + else { // Probably a pre-bound resource... sync = cleanupResource(this.resourceHolder, this.resourceKey, (status == STATUS_COMMITTED)); } - ; return sync.doFinally(s -> this.resourceHolder.reset()); }); } @@ -151,7 +144,6 @@ protected boolean shouldUnbindAtCompletion() { *

Note that resources will only be released when they are * unbound from the thread ({@link #shouldUnbindAtCompletion()}). *

The default implementation returns {@code true}. - * * @see #releaseResource */ protected boolean shouldReleaseBeforeCompletion() { @@ -163,27 +155,16 @@ protected boolean shouldReleaseBeforeCompletion() { * transaction completion ({@code true}). *

The default implementation returns {@code !shouldReleaseBeforeCompletion()}, * releasing after completion if no attempt was made before completion. - * * @see #releaseResource */ protected boolean shouldReleaseAfterCompletion(H resourceHolder) { return !shouldReleaseBeforeCompletion(); } - /** - * Flush callback for the given resource holder. - * - * @param resourceHolder the resource holder to flush - */ - protected Mono flushResource(H resourceHolder) { - return Mono.empty(); - } - /** * After-commit callback for the given resource holder. * Only called when the resource hasn't been released yet * ({@link #shouldReleaseBeforeCompletion()}). - * * @param resourceHolder the resource holder to process */ protected Mono processResourceAfterCommit(H resourceHolder) { @@ -192,7 +173,6 @@ protected Mono processResourceAfterCommit(H resourceHolder) { /** * Release the given resource (after it has been unbound from the thread). - * * @param resourceHolder the resource holder to process * @param resourceKey the key that the ResourceHolder was bound for */ @@ -202,7 +182,6 @@ protected Mono releaseResource(H resourceHolder, K resourceKey) { /** * Perform a cleanup on the given resource (which is left bound to the thread). - * * @param resourceHolder the resource holder to process * @param resourceKey the key that the ResourceHolder was bound for * @param committed whether the transaction has committed ({@code true}) diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionCallback.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionCallback.java index 0409620d410c..4ee74803f11a 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionCallback.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionCallback.java @@ -38,15 +38,10 @@ public interface ReactiveTransactionCallback { /** - * Gets called by {@link TransactionalOperator#transactional} within a transactional context. + * Gets called by {@link TransactionalOperator} within a transactional context. * Does not need to care about transactions itself, although it can retrieve and * influence the status of the current transaction via the given status object, * e.g. setting rollback-only. - *

Allows for returning a result object created within the transaction, i.e. a - * domain object or a collection of domain objects. A RuntimeException thrown by the - * callback is treated as application exception that enforces a rollback. Any such - * exception will be propagated to the caller of the template, unless there is a - * problem rolling back, in which case a TransactionException will be thrown. * @param status associated transaction status * @return a result publisher * @see TransactionalOperator#transactional diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronization.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronization.java index 1ec9a58fb940..ed10f52530f3 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronization.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronization.java @@ -18,8 +18,6 @@ import reactor.core.publisher.Mono; -import org.springframework.transaction.ReactiveTransactionStatus; - /** * Interface for reactive transaction synchronization callbacks. * Supported by {@link AbstractReactiveTransactionManager}. @@ -67,17 +65,8 @@ default Mono resume() { return Mono.empty(); } - /** - * Flush the underlying session to the datastore, if applicable. - * @see ReactiveTransactionStatus#flush() - */ - default Mono flush() { - return Mono.empty(); - } - /** * Invoked before transaction commit (before "beforeCompletion"). - * Can e.g. flush transactional O/R Mapping sessions to the database. *

This callback does not mean that the transaction will actually be committed. * A rollback decision can still occur after this method has been called. This callback * is rather meant to perform work that's only relevant if a commit still has a chance diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronizationManager.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronizationManager.java index 306400624933..d8448e93c9d9 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronizationManager.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronizationManager.java @@ -30,8 +30,6 @@ import org.springframework.core.annotation.AnnotationAwareOrderComparator; import org.springframework.lang.Nullable; import org.springframework.transaction.NoTransactionException; -import org.springframework.transaction.support.ResourceHolder; -import org.springframework.transaction.support.TransactionSynchronization; import org.springframework.util.Assert; /** @@ -43,9 +41,9 @@ * to be removed before a new one can be set for the same key. * Supports a list of transaction synchronizations if synchronization is active. * - *

Resource management code should check for context-bound resources, e.g. database - * connections, via {@code getResource}. Such code is - * normally not supposed to bind resources to units of work, as this is the responsibility + *

Resource management code should check for context-bound resources, e.g. + * database connections, via {@code getResource}. Such code is normally not + * supposed to bind resources to units of work, as this is the responsibility * of transaction managers. A further option is to lazily bind on first use if * transaction synchronization is active, for performing transactions that span * an arbitrary number of resources. @@ -61,16 +59,15 @@ * isn't active, there is either no current transaction, or the transaction manager * doesn't support transaction synchronization. * - *

Synchronization is for example used to always return the same resources - * within a transaction, e.g. a database connection for - * any given Connectionfactory or DatabaseFactory. + *

Synchronization is for example used to always return the same resources within + * a transaction, e.g. a database connection for any given connection factory. * * @author Mark Paluch + * @author Juergen Hoeller * @since 5.2 * @see #isSynchronizationActive * @see #registerSynchronization - * @see TransactionSynchronization - * @see AbstractReactiveTransactionManager#setTransactionSynchronization + * @see ReactiveTransactionSynchronization */ public class ReactiveTransactionSynchronizationManager { @@ -97,10 +94,8 @@ public static Mono currentTransaction /** * Check if there is a resource for the given key bound to the current thread. - * * @param key the key to check (usually the resource factory) * @return if there is a value bound to the current thread - * @see ResourceTransactionManager#getResourceFactory() */ public boolean hasResource(Object key) { Object actualKey = ReactiveTransactionSynchronizationUtils.unwrapResourceIfNecessary(key); @@ -110,11 +105,9 @@ public boolean hasResource(Object key) { /** * Retrieve a resource for the given key that is bound to the current thread. - * * @param key the key to check (usually the resource factory) * @return a value bound to the current thread (usually the active * resource object), or {@code null} if none - * @see ResourceTransactionManager#getResourceFactory() */ @Nullable public Object getResource(Object key) { @@ -122,7 +115,7 @@ public Object getResource(Object key) { Object value = doGetResource(actualKey); if (value != null && logger.isTraceEnabled()) { logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to context [" + - transactionContext.getName() + "]"); + this.transactionContext.getName() + "]"); } return value; } @@ -132,64 +125,50 @@ public Object getResource(Object key) { */ @Nullable private Object doGetResource(Object actualKey) { - Map map = transactionContext.getResources(); + Map map = this.transactionContext.getResources(); Object value = map.get(actualKey); - // Transparently remove ResourceHolder that was marked as void... - if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) { - map.remove(actualKey); - value = null; - } return value; } /** * Bind the given resource for the given key to the current context. - * * @param key the key to bind the value to (usually the resource factory) * @param value the value to bind (usually the active resource object) * @throws IllegalStateException if there is already a value bound to the context - * @see ResourceTransactionManager#getResourceFactory() */ public void bindResource(Object key, Object value) throws IllegalStateException { Object actualKey = ReactiveTransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Assert.notNull(value, "Value must not be null"); - Map map = transactionContext.getResources(); + Map map = this.transactionContext.getResources(); Object oldValue = map.put(actualKey, value); - // Transparently suppress a ResourceHolder that was marked as void... - if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) { - oldValue = null; - } if (oldValue != null) { throw new IllegalStateException("Already value [" + oldValue + "] for key [" + - actualKey + "] bound to context [" + transactionContext.getName() + "]"); + actualKey + "] bound to context [" + this.transactionContext.getName() + "]"); } if (logger.isTraceEnabled()) { logger.trace("Bound value [" + value + "] for key [" + actualKey + "] to context [" + - transactionContext.getName() + "]"); + this.transactionContext.getName() + "]"); } } /** * Unbind a resource for the given key from the current context. - * * @param key the key to unbind (usually the resource factory) * @return the previously bound value (usually the active resource object) * @throws IllegalStateException if there is no value bound to the context - * @see ResourceTransactionManager#getResourceFactory() */ public Object unbindResource(Object key) throws IllegalStateException { Object actualKey = ReactiveTransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Object value = doUnbindResource(actualKey); if (value == null) { throw new IllegalStateException( - "No value for key [" + actualKey + "] bound to context [" + transactionContext.getName() + "]"); + "No value for key [" + actualKey + "] bound to context [" + this.transactionContext.getName() + "]"); } return value; } /** * Unbind a resource for the given key from the current context. - * * @param key the key to unbind (usually the resource factory) * @return the previously bound value, or {@code null} if none bound */ @@ -204,19 +183,16 @@ public Object unbindResourceIfPossible(Object key) { */ @Nullable private Object doUnbindResource(Object actualKey) { - Map map = transactionContext.getResources(); + Map map = this.transactionContext.getResources(); Object value = map.remove(actualKey); - // Transparently suppress a ResourceHolder that was marked as void... - if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) { - value = null; - } if (value != null && logger.isTraceEnabled()) { logger.trace("Removed value [" + value + "] for key [" + actualKey + "] from context [" + - transactionContext.getName() + "]"); + this.transactionContext.getName() + "]"); } return value; } + //------------------------------------------------------------------------- // Management of transaction synchronizations //------------------------------------------------------------------------- @@ -224,17 +200,15 @@ private Object doUnbindResource(Object actualKey) { /** * Return if transaction synchronization is active for the current context. * Can be called before register to avoid unnecessary instance creation. - * * @see #registerSynchronization */ public boolean isSynchronizationActive() { - return (transactionContext.getSynchronizations() != null); + return (this.transactionContext.getSynchronizations() != null); } /** * Activate transaction synchronization for the current context. * Called by a transaction manager on transaction begin. - * * @throws IllegalStateException if synchronization is already active */ public void initSynchronization() throws IllegalStateException { @@ -242,7 +216,7 @@ public void initSynchronization() throws IllegalStateException { throw new IllegalStateException("Cannot activate transaction synchronization - already active"); } logger.trace("Initializing transaction synchronization"); - transactionContext.setSynchronizations(new LinkedHashSet<>()); + this.transactionContext.setSynchronizations(new LinkedHashSet<>()); } /** @@ -251,7 +225,6 @@ public void initSynchronization() throws IllegalStateException { *

Note that synchronizations can implement the * {@link org.springframework.core.Ordered} interface. * They will be executed in an order according to their order value (if any). - * * @param synchronization the synchronization object to register * @throws IllegalStateException if transaction synchronization is not active * @see org.springframework.core.Ordered @@ -260,22 +233,22 @@ public void registerSynchronization(ReactiveTransactionSynchronization synchroni throws IllegalStateException { Assert.notNull(synchronization, "TransactionSynchronization must not be null"); - if (!isSynchronizationActive()) { + Set synchs = this.transactionContext.getSynchronizations(); + if (synchs == null) { throw new IllegalStateException("Transaction synchronization is not active"); } - transactionContext.getSynchronizations().add(synchronization); + synchs.add(synchronization); } /** * Return an unmodifiable snapshot list of all registered synchronizations * for the current context. - * * @return unmodifiable List of TransactionSynchronization instances * @throws IllegalStateException if synchronization is not active - * @see TransactionSynchronization + * @see ReactiveTransactionSynchronization */ public List getSynchronizations() throws IllegalStateException { - Set synchs = transactionContext.getSynchronizations(); + Set synchs = this.transactionContext.getSynchronizations(); if (synchs == null) { throw new IllegalStateException("Transaction synchronization is not active"); } @@ -284,7 +257,8 @@ public List getSynchronizations() throws Ill // might register further synchronizations. if (synchs.isEmpty()) { return Collections.emptyList(); - } else { + } + else { // Sort lazily here, not in registerSynchronization. List sortedSynchs = new ArrayList<>(synchs); AnnotationAwareOrderComparator.sort(sortedSynchs); @@ -295,7 +269,6 @@ public List getSynchronizations() throws Ill /** * Deactivate transaction synchronization for the current context. * Called by the transaction manager on transaction cleanup. - * * @throws IllegalStateException if synchronization is not active */ public void clearSynchronization() throws IllegalStateException { @@ -303,9 +276,10 @@ public void clearSynchronization() throws IllegalStateException { throw new IllegalStateException("Cannot deactivate transaction synchronization - not active"); } logger.trace("Clearing transaction synchronization"); - transactionContext.setSynchronizations(null); + this.transactionContext.setSynchronizations(null); } + //------------------------------------------------------------------------- // Exposure of transaction characteristics //------------------------------------------------------------------------- @@ -313,59 +287,53 @@ public void clearSynchronization() throws IllegalStateException { /** * Expose the name of the current transaction, if any. * Called by the transaction manager on transaction begin and on cleanup. - * * @param name the name of the transaction, or {@code null} to reset it * @see org.springframework.transaction.TransactionDefinition#getName() */ public void setCurrentTransactionName(@Nullable String name) { - transactionContext.setCurrentTransactionName(name); + this.transactionContext.setCurrentTransactionName(name); } /** * Return the name of the current transaction, or {@code null} if none set. * To be called by resource management code for optimizations per use case, * for example to optimize fetch strategies for specific named transactions. - * * @see org.springframework.transaction.TransactionDefinition#getName() */ @Nullable public String getCurrentTransactionName() { - return transactionContext.getCurrentTransactionName(); + return this.transactionContext.getCurrentTransactionName(); } /** * Expose a read-only flag for the current transaction. * Called by the transaction manager on transaction begin and on cleanup. - * * @param readOnly {@code true} to mark the current transaction * as read-only; {@code false} to reset such a read-only marker * @see org.springframework.transaction.TransactionDefinition#isReadOnly() */ public void setCurrentTransactionReadOnly(boolean readOnly) { - transactionContext.setCurrentTransactionReadOnly(readOnly); + this.transactionContext.setCurrentTransactionReadOnly(readOnly); } /** * Return whether the current transaction is marked as read-only. * To be called by resource management code when preparing a newly - * created resource (for example, a Hibernate Session). + * created resource. *

Note that transaction synchronizations receive the read-only flag * as argument for the {@code beforeCommit} callback, to be able * to suppress change detection on commit. The present method is meant - * to be used for earlier read-only checks, for example to set the - * flush mode of a Hibernate Session to "FlushMode.NEVER" upfront. - * + * to be used for earlier read-only checks. * @see org.springframework.transaction.TransactionDefinition#isReadOnly() - * @see TransactionSynchronization#beforeCommit(boolean) + * @see ReactiveTransactionSynchronization#beforeCommit(boolean) */ public boolean isCurrentTransactionReadOnly() { - return transactionContext.isCurrentTransactionReadOnly(); + return this.transactionContext.isCurrentTransactionReadOnly(); } /** * Expose an isolation level for the current transaction. * Called by the transaction manager on transaction begin and on cleanup. - * * @param isolationLevel the isolation level to expose, according to the * R2DBC Connection constants (equivalent to the corresponding Spring * TransactionDefinition constants), or {@code null} to reset it @@ -376,14 +344,13 @@ public boolean isCurrentTransactionReadOnly() { * @see org.springframework.transaction.TransactionDefinition#getIsolationLevel() */ public void setCurrentTransactionIsolationLevel(@Nullable Integer isolationLevel) { - transactionContext.setCurrentTransactionIsolationLevel(isolationLevel); + this.transactionContext.setCurrentTransactionIsolationLevel(isolationLevel); } /** * Return the isolation level for the current transaction, if any. * To be called by resource management code when preparing a newly * created resource (for example, a R2DBC Connection). - * * @return the currently exposed isolation level, according to the * R2DBC Connection constants (equivalent to the corresponding Spring * TransactionDefinition constants), or {@code null} if none @@ -395,18 +362,17 @@ public void setCurrentTransactionIsolationLevel(@Nullable Integer isolationLevel */ @Nullable public Integer getCurrentTransactionIsolationLevel() { - return transactionContext.getCurrentTransactionIsolationLevel(); + return this.transactionContext.getCurrentTransactionIsolationLevel(); } /** * Expose whether there currently is an actual transaction active. * Called by the transaction manager on transaction begin and on cleanup. - * * @param active {@code true} to mark the current context as being associated * with an actual transaction; {@code false} to reset that marker */ public void setActualTransactionActive(boolean active) { - transactionContext.setActualTransactionActive(active); + this.transactionContext.setActualTransactionActive(active); } /** @@ -418,17 +384,15 @@ public void setActualTransactionActive(boolean active) { * resource transaction; also on PROPAGATION_SUPPORTS) and an actual * transaction being active (with backing resource transaction; * on PROPAGATION_REQUIRED, PROPAGATION_REQUIRES_NEW, etc). - * * @see #isSynchronizationActive() */ public boolean isActualTransactionActive() { - return transactionContext.isActualTransactionActive(); + return this.transactionContext.isActualTransactionActive(); } /** * Clear the entire transaction synchronization state: * registered synchronizations as well as the various transaction characteristics. - * * @see #clearSynchronization() * @see #setCurrentTransactionName * @see #setCurrentTransactionReadOnly @@ -436,11 +400,11 @@ public boolean isActualTransactionActive() { * @see #setActualTransactionActive */ public void clear() { - transactionContext.clear(); + this.transactionContext.clear(); } private Map getResources() { - return transactionContext.getResources(); + return this.transactionContext.getResources(); } } diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronizationUtils.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronizationUtils.java index 677cb8122a1e..0913bdae3606 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronizationUtils.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronizationUtils.java @@ -25,7 +25,6 @@ import org.springframework.aop.scope.ScopedObject; import org.springframework.core.InfrastructureProxy; -import org.springframework.transaction.support.TransactionSynchronization; import org.springframework.util.Assert; import org.springframework.util.ClassUtils; @@ -34,11 +33,12 @@ * callback methods on all currently registered synchronizations. * * @author Mark Paluch + * @author Juergen Hoeller * @since 5.2 * @see ReactiveTransactionSynchronization * @see ReactiveTransactionSynchronizationManager#getSynchronizations() */ -public abstract class ReactiveTransactionSynchronizationUtils { +abstract class ReactiveTransactionSynchronizationUtils { private static final Log logger = LogFactory.getLog(ReactiveTransactionSynchronizationUtils.class); @@ -66,49 +66,14 @@ static Object unwrapResourceIfNecessary(Object resource) { } - /** - * Trigger {@code flush} callbacks on all currently registered synchronizations. - * @throws RuntimeException if thrown by a {@code flush} callback - * @see ReactiveTransactionSynchronization#flush() - */ - public static Mono triggerFlush() { - return TransactionContextManager.currentContext().flatMapIterable(TransactionContext::getSynchronizations).concatMap(ReactiveTransactionSynchronization::flush).then(); - } - - /** - * Trigger {@code beforeCommit} callbacks on all currently registered synchronizations. - * - * @param readOnly whether the transaction is defined as read-only transaction - * @throws RuntimeException if thrown by a {@code beforeCommit} callback - * @see ReactiveTransactionSynchronization#beforeCommit(boolean) - */ - public static Mono triggerBeforeCommit(boolean readOnly) { - return TransactionContextManager.currentContext() - .map(TransactionContext::getSynchronizations) - .flatMap(it -> triggerBeforeCommit(it, readOnly)).then(); - } - /** * Actually invoke the {@code triggerBeforeCommit} methods of the * given Spring ReactiveTransactionSynchronization objects. - * * @param synchronizations a List of ReactiveTransactionSynchronization objects * @see ReactiveTransactionSynchronization#beforeCommit(boolean) */ public static Mono triggerBeforeCommit(Collection synchronizations, boolean readOnly) { - return Flux.fromIterable(synchronizations).concatMap(it -> it.beforeCommit(readOnly)) - .then(); - } - - /** - * Trigger {@code beforeCompletion} callbacks on all currently registered synchronizations. - * @see ReactiveTransactionSynchronization#beforeCompletion() - */ - public static Mono triggerBeforeCompletion() { - - return TransactionContextManager.currentContext() - .map(TransactionContext::getSynchronizations) - .flatMap(ReactiveTransactionSynchronizationUtils::triggerBeforeCompletion); + return Flux.fromIterable(synchronizations).concatMap(it -> it.beforeCommit(readOnly)).then(); } /** @@ -118,29 +83,16 @@ public static Mono triggerBeforeCompletion() { * @see ReactiveTransactionSynchronization#beforeCompletion() */ public static Mono triggerBeforeCompletion(Collection synchronizations) { - return Flux.fromIterable(synchronizations) - .concatMap(ReactiveTransactionSynchronization::beforeCompletion).onErrorContinue((t, o) -> { - logger.error("TransactionSynchronization.beforeCompletion threw exception", t); - }).then(); - } - - /** - * Trigger {@code afterCommit} callbacks on all currently registered synchronizations. - * @throws RuntimeException if thrown by a {@code afterCommit} callback - * @see ReactiveTransactionSynchronizationManager#getSynchronizations() - * @see ReactiveTransactionSynchronization#afterCommit() - */ - public static Mono triggerAfterCommit() { - return TransactionContextManager.currentContext() - .flatMap(it -> invokeAfterCommit(it.getSynchronizations())); + .concatMap(ReactiveTransactionSynchronization::beforeCompletion).onErrorContinue((t, o) -> + logger.error("TransactionSynchronization.beforeCompletion threw exception", t)).then(); } /** * Actually invoke the {@code afterCommit} methods of the * given Spring ReactiveTransactionSynchronization objects. * @param synchronizations a List of ReactiveTransactionSynchronization objects - * @see TransactionSynchronization#afterCommit() + * @see ReactiveTransactionSynchronization#afterCommit() */ public static Mono invokeAfterCommit(Collection synchronizations) { return Flux.fromIterable(synchronizations) @@ -148,21 +100,6 @@ public static Mono invokeAfterCommit(Collection triggerAfterCompletion(int completionStatus) { - return TransactionContextManager.currentContext() - .flatMap(it -> invokeAfterCompletion(it.getSynchronizations(), completionStatus)); - } - /** * Actually invoke the {@code afterCompletion} methods of the * given Spring ReactiveTransactionSynchronization objects. @@ -174,13 +111,11 @@ public static Mono triggerAfterCompletion(int completionStatus) { * @see ReactiveTransactionSynchronization#STATUS_ROLLED_BACK * @see ReactiveTransactionSynchronization#STATUS_UNKNOWN */ - public static Mono invokeAfterCompletion(Collection synchronizations, - int completionStatus) { + public static Mono invokeAfterCompletion( + Collection synchronizations, int completionStatus) { return Flux.fromIterable(synchronizations).concatMap(it -> it.afterCompletion(completionStatus)) - .onErrorContinue((t, o) -> { - logger.error("TransactionSynchronization.afterCompletion threw exception", t); - }).then(); + .onErrorContinue((t, o) -> logger.error("TransactionSynchronization.afterCompletion threw exception", t)).then(); } @@ -189,7 +124,7 @@ public static Mono invokeAfterCompletion(Collection resources = new LinkedHashMap<>(); - private @Nullable Set synchronizations; + @Nullable + private Set synchronizations; private volatile @Nullable String currentTransactionName; @@ -51,8 +55,6 @@ public class TransactionContext { private volatile boolean actualTransactionActive; - private final @Nullable TransactionContext parent; - TransactionContext() { this(null); @@ -63,78 +65,76 @@ public class TransactionContext { } - public void clear() { - - synchronizations = null; - currentTransactionName = null; - currentTransactionReadOnly = false; - currentTransactionIsolationLevel = null; - actualTransactionActive = false; + @Nullable + public TransactionContext getParent() { + return this.parent; } public String getName() { - - if (StringUtils.hasText(currentTransactionName)) { - return contextId + ": " + currentTransactionName; + if (StringUtils.hasText(this.currentTransactionName)) { + return this.contextId + ": " + this.currentTransactionName; } - - return contextId.toString(); + return this.contextId.toString(); } public UUID getContextId() { - return contextId; + return this.contextId; } public Map getResources() { - return resources; - } - - @Nullable - public Set getSynchronizations() { - return synchronizations; + return this.resources; } - public void setSynchronizations(@org.springframework.lang.Nullable Set synchronizations) { + public void setSynchronizations(@Nullable Set synchronizations) { this.synchronizations = synchronizations; } @Nullable - public String getCurrentTransactionName() { - return currentTransactionName; + public Set getSynchronizations() { + return this.synchronizations; } public void setCurrentTransactionName(@Nullable String currentTransactionName) { this.currentTransactionName = currentTransactionName; } - public boolean isCurrentTransactionReadOnly() { - return currentTransactionReadOnly; + @Nullable + public String getCurrentTransactionName() { + return this.currentTransactionName; } public void setCurrentTransactionReadOnly(boolean currentTransactionReadOnly) { this.currentTransactionReadOnly = currentTransactionReadOnly; } - @Nullable - public Integer getCurrentTransactionIsolationLevel() { - return currentTransactionIsolationLevel; + public boolean isCurrentTransactionReadOnly() { + return this.currentTransactionReadOnly; } public void setCurrentTransactionIsolationLevel(@Nullable Integer currentTransactionIsolationLevel) { this.currentTransactionIsolationLevel = currentTransactionIsolationLevel; } - public boolean isActualTransactionActive() { - return actualTransactionActive; + @Nullable + public Integer getCurrentTransactionIsolationLevel() { + return this.currentTransactionIsolationLevel; } public void setActualTransactionActive(boolean actualTransactionActive) { this.actualTransactionActive = actualTransactionActive; } - @Nullable - public TransactionContext getParent() { - return parent; + public boolean isActualTransactionActive() { + return this.actualTransactionActive; + } + + + public void clear() { + this.synchronizations = null; + this.currentTransactionName = null; + this.currentTransactionReadOnly = false; + this.currentTransactionIsolationLevel = null; + this.actualTransactionActive = false; } } diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContextHolder.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContextHolder.java index 5ae5f47fb043..44a8997b55f2 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContextHolder.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContextHolder.java @@ -13,61 +13,65 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.springframework.transaction.reactive; -import java.util.Stack; +import java.util.Deque; import org.springframework.transaction.NoTransactionException; /** * Mutable holder for reactive transaction {@link TransactionContext contexts}. * This holder keeps references to individual {@link TransactionContext}s. + * * @author Mark Paluch + * @author Juergen Hoeller * @since 5.2 * @see TransactionContext */ -class TransactionContextHolder { +final class TransactionContextHolder { - private final Stack transactionStack; + private final Deque transactionStack; - TransactionContextHolder(Stack transactionStack) { + TransactionContextHolder(Deque transactionStack) { this.transactionStack = transactionStack; } /** * Return the current {@link TransactionContext}. - * @return the current {@link TransactionContext}. - * @throws NoTransactionException if no transaction is ongoing. + * @throws NoTransactionException if no transaction is ongoing */ TransactionContext currentContext() { - TransactionContext context = (transactionStack.isEmpty() ? null : transactionStack.peek()); - + TransactionContext context = this.transactionStack.peek(); if (context == null) { throw new NoTransactionException("No transaction in context"); } - return context; } /** * Create a new {@link TransactionContext}. - * @return the new {@link TransactionContext}. */ TransactionContext createContext() { - TransactionContext context = (transactionStack.isEmpty() ? null : transactionStack.peek()); - - return (context == null ? transactionStack.push(new TransactionContext()) : - transactionStack.push(new TransactionContext(context))); + TransactionContext context = this.transactionStack.peek(); + if (context != null) { + context = new TransactionContext(context); + } + else { + context = new TransactionContext(); + } + this.transactionStack.push(context); + return context; } /** * Check whether the holder has a {@link TransactionContext}. - * @return {@literal true} if a {@link TransactionContext} is associated. + * @return {@literal true} if a {@link TransactionContext} is associated */ boolean hasContext() { - return !transactionStack.isEmpty(); + return !this.transactionStack.isEmpty(); } } diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContextManager.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContextManager.java index 5071ce302146..f35e8830b142 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContextManager.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContextManager.java @@ -16,7 +16,7 @@ package org.springframework.transaction.reactive; -import java.util.Stack; +import java.util.ArrayDeque; import java.util.function.Function; import reactor.core.publisher.Flux; @@ -27,9 +27,9 @@ /** * Delegate to register and obtain transactional contexts. - *

- * Typically used by components that intercept or orchestrate transactional flows such as AOP interceptors or - * transactional operators. + * + *

Typically used by components that intercept or orchestrate transactional flows + * such as AOP interceptors or transactional operators. * * @author Mark Paluch * @since 5.2 @@ -37,9 +37,8 @@ */ public abstract class TransactionContextManager { - private TransactionContextManager() { - /* prevent instantiation */ - } + private TransactionContextManager() { + } /** @@ -51,14 +50,11 @@ private TransactionContextManager() { * or no context found in a holder */ public static Mono currentContext() throws NoTransactionException { - return Mono.subscriberContext().handle((ctx, sink) -> { - if (ctx.hasKey(TransactionContext.class)) { sink.next(ctx.get(TransactionContext.class)); return; } - if (ctx.hasKey(TransactionContextHolder.class)) { TransactionContextHolder holder = ctx.get(TransactionContextHolder.class); if (holder.hasContext()) { @@ -66,7 +62,6 @@ public static Mono currentContext() throws NoTransactionExce return; } } - sink.error(new NoTransactionException("No transaction in context")); }); } @@ -74,9 +69,9 @@ public static Mono currentContext() throws NoTransactionExce /** * Create a {@link TransactionContext} and register it in the subscriber {@link Context}. * @return functional context registration. + * @throws IllegalStateException if a transaction context is already associated. * @see Mono#subscriberContext(Function) * @see Flux#subscriberContext(Function) - * @throws IllegalStateException if a transaction context is already associated. */ public static Function createTransactionContext() { return context -> context.put(TransactionContext.class, new TransactionContext()); @@ -91,13 +86,10 @@ public static Function createTransactionContext() { */ public static Function getOrCreateContext() { return context -> { - TransactionContextHolder holder = context.get(TransactionContextHolder.class); - if (holder.hasContext()) { context.put(TransactionContext.class, holder.currentContext()); } - return context.put(TransactionContext.class, holder.createContext()); }; } @@ -111,11 +103,9 @@ public static Function getOrCreateContext() { * @return functional context registration. */ public static Function getOrCreateContextHolder() { - return context -> { - if (!context.hasKey(TransactionContextHolder.class)) { - return context.put(TransactionContextHolder.class, new TransactionContextHolder(new Stack<>())); + return context.put(TransactionContextHolder.class, new TransactionContextHolder(new ArrayDeque<>())); } return context; }; diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperator.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperator.java index 5faa80c36c1f..34382e2c2db3 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperator.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperator.java @@ -19,9 +19,9 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import org.springframework.transaction.ReactiveTransactionManager; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionException; -import org.springframework.transaction.ReactiveTransactionManager; /** * Operator class that simplifies programmatic transaction demarcation and @@ -38,40 +38,24 @@ * application services utilizing this class, making calls to the low-level * services via an inner-class callback object. * - *

Can be used within a service implementation via direct instantiation with - * a transaction manager reference, or get prepared in an application context - * and passed to services as bean reference. Note: The transaction manager should - * always be configured as bean in the application context: in the first case given - * to the service directly, in the second case given to the prepared template. - * - *

Supports setting the propagation behavior and the isolation level by name, - * for convenient configuration in context definitions. - * * @author Mark Paluch + * @author Juergen Hoeller * @since 5.2 * @see #execute * @see ReactiveTransactionManager */ public interface TransactionalOperator { - /** - * Create a new {@link TransactionalOperator} using {@link ReactiveTransactionManager}. - * @param transactionManager the transaction management strategy to be used - * @return the transactional operator - */ - static TransactionalOperator create(ReactiveTransactionManager transactionManager){ - return new DefaultTransactionalOperator(transactionManager); - } - /** * Create a new {@link TransactionalOperator} using {@link ReactiveTransactionManager} * and {@link TransactionDefinition}. - * * @param transactionManager the transaction management strategy to be used - * @param transactionDefinition the transaction definition to apply. + * @param transactionDefinition the transaction definition to apply * @return the transactional operator */ - static TransactionalOperator create(ReactiveTransactionManager transactionManager, TransactionDefinition transactionDefinition){ + static TransactionalOperator create( + ReactiveTransactionManager transactionManager, TransactionDefinition transactionDefinition){ + return new DefaultTransactionalOperator(transactionManager, transactionDefinition); } diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/package-info.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/package-info.java index 3b8526e87fa4..ef3661198641 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/package-info.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/package-info.java @@ -1,5 +1,5 @@ /** - * Support classes for the org.springframework.transaction.reactive package. + * Support classes for reactive transaction management. * Provides an abstract base class for reactive transaction manager implementations, * and a transactional operator plus callback for transaction demarcation. */ diff --git a/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTestTransactionManager.java b/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTestTransactionManager.java index 80db3e0afe1f..b6b4e58b547c 100644 --- a/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTestTransactionManager.java +++ b/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTestTransactionManager.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -48,7 +48,6 @@ class ReactiveTestTransactionManager extends AbstractReactiveTransactionManager ReactiveTestTransactionManager(boolean existingTransaction, boolean canCreateTransaction) { this.existingTransaction = existingTransaction; this.canCreateTransaction = canCreateTransaction; - setTransactionSynchronization(SYNCHRONIZATION_NEVER); } @@ -59,7 +58,7 @@ protected Object doGetTransaction(ReactiveTransactionSynchronizationManager sync @Override protected boolean isExistingTransaction(Object transaction) { - return existingTransaction; + return this.existingTransaction; } @Override @@ -96,4 +95,5 @@ protected Mono doSetRollbackOnly(ReactiveTransactionSynchronizationManager } return Mono.fromRunnable(() -> this.rollbackOnly = true); } + } diff --git a/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTransactionSupportUnitTests.java b/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTransactionSupportUnitTests.java index 258ec59160cf..5a3b26916106 100644 --- a/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTransactionSupportUnitTests.java +++ b/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTransactionSupportUnitTests.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -87,7 +87,8 @@ public void existingTransaction() { @Test public void commitWithoutExistingTransaction() { ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true); - tm.getTransaction(null).flatMap(tm::commit).subscriberContext(TransactionContextManager.createTransactionContext()) + tm.getTransaction(new DefaultTransactionDefinition()).flatMap(tm::commit) + .subscriberContext(TransactionContextManager.createTransactionContext()) .as(StepVerifier::create).verifyComplete(); assertHasBegan(tm); @@ -99,7 +100,7 @@ public void commitWithoutExistingTransaction() { @Test public void rollbackWithoutExistingTransaction() { ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true); - tm.getTransaction(null).flatMap(tm::rollback) + tm.getTransaction(new DefaultTransactionDefinition()).flatMap(tm::rollback) .subscriberContext(TransactionContextManager.createTransactionContext()).as(StepVerifier::create) .verifyComplete(); @@ -112,7 +113,8 @@ public void rollbackWithoutExistingTransaction() { @Test public void rollbackOnlyWithoutExistingTransaction() { ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true); - tm.getTransaction(null).doOnNext(ReactiveTransactionStatus::setRollbackOnly).flatMap(tm::commit) + tm.getTransaction(new DefaultTransactionDefinition()).doOnNext(ReactiveTransactionStatus::setRollbackOnly) + .flatMap(tm::commit) .subscriberContext(TransactionContextManager.createTransactionContext()).as(StepVerifier::create) .verifyComplete(); @@ -125,7 +127,8 @@ public void rollbackOnlyWithoutExistingTransaction() { @Test public void commitWithExistingTransaction() { ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(true, true); - tm.getTransaction(null).flatMap(tm::commit).subscriberContext(TransactionContextManager.createTransactionContext()) + tm.getTransaction(new DefaultTransactionDefinition()).flatMap(tm::commit) + .subscriberContext(TransactionContextManager.createTransactionContext()) .as(StepVerifier::create).verifyComplete(); assertHasNotBegan(tm); @@ -137,7 +140,7 @@ public void commitWithExistingTransaction() { @Test public void rollbackWithExistingTransaction() { ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(true, true); - tm.getTransaction(null).flatMap(tm::rollback) + tm.getTransaction(new DefaultTransactionDefinition()).flatMap(tm::rollback) .subscriberContext(TransactionContextManager.createTransactionContext()).as(StepVerifier::create) .verifyComplete(); @@ -150,7 +153,7 @@ public void rollbackWithExistingTransaction() { @Test public void rollbackOnlyWithExistingTransaction() { ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(true, true); - tm.getTransaction(null).doOnNext(ReactiveTransactionStatus::setRollbackOnly).flatMap(tm::commit) + tm.getTransaction(new DefaultTransactionDefinition()).doOnNext(ReactiveTransactionStatus::setRollbackOnly).flatMap(tm::commit) .subscriberContext(TransactionContextManager.createTransactionContext()).as(StepVerifier::create) .verifyComplete(); @@ -163,7 +166,7 @@ public void rollbackOnlyWithExistingTransaction() { @Test public void transactionTemplate() { ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true); - TransactionalOperator operator = TransactionalOperator.create(tm); + TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition()); Flux.just("Walter").as(operator::transactional) .as(StepVerifier::create) @@ -179,7 +182,7 @@ public void transactionTemplate() { @Test public void transactionTemplateWithException() { ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true); - TransactionalOperator operator = TransactionalOperator.create(tm); + TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition()); RuntimeException ex = new RuntimeException("Some application exception"); Mono.error(ex).as(operator::transactional) diff --git a/spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java b/spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java index f52d893c5dad..c6308e7e86df 100644 --- a/spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java +++ b/spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java @@ -21,6 +21,8 @@ import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import org.springframework.transaction.support.DefaultTransactionDefinition; + import static org.junit.Assert.*; /** @@ -32,57 +34,47 @@ public class TransactionalOperatorTests { ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true); + @Test public void commitWithMono() { - - TransactionalOperator operator = TransactionalOperator.create(tm); - + TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition()); Mono.just(true).as(operator::transactional) .as(StepVerifier::create) .expectNext(true) .verifyComplete(); - assertTrue(tm.commit); assertFalse(tm.rollback); } @Test public void rollbackWithMono() { - - TransactionalOperator operator = TransactionalOperator.create(tm); - + TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition()); Mono.error(new IllegalStateException()).as(operator::transactional) .as(StepVerifier::create) .verifyError(IllegalStateException.class); - assertFalse(tm.commit); assertTrue(tm.rollback); } @Test public void commitWithFlux() { - - TransactionalOperator operator = TransactionalOperator.create(tm); - + TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition()); Flux.just(true).as(operator::transactional) .as(StepVerifier::create) .expectNext(true) .verifyComplete(); - assertTrue(tm.commit); assertFalse(tm.rollback); } @Test public void rollbackWithFlux() { - - TransactionalOperator operator = TransactionalOperator.create(tm); - + TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition()); Flux.error(new IllegalStateException()).as(operator::transactional) .as(StepVerifier::create) .verifyError(IllegalStateException.class); - assertFalse(tm.commit); assertTrue(tm.rollback); } + }