diff --git a/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java b/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java index 870de048e88b..88cef8273a80 100644 --- a/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java +++ b/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java @@ -270,6 +270,7 @@ void testCommitFails() { verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class)); verify(connectionMock).createStatement("foo"); verify(connectionMock).commitTransaction(); + verify(connectionMock).rollbackTransaction(); verify(connectionMock).close(); verifyNoMoreInteractions(connectionMock); } diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java index cdc434fc7361..85855bafcc1c 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java @@ -433,7 +433,7 @@ private Mono processCommit(TransactionSynchronizationManager synchronizati AtomicBoolean beforeCompletionInvoked = new AtomicBoolean(); - Mono commit = prepareForCommit(synchronizationManager, status) + Mono commit = prepareForCommit(synchronizationManager, status) .then(triggerBeforeCommit(synchronizationManager, status)) .then(triggerBeforeCompletion(synchronizationManager, status)) .then(Mono.defer(() -> { @@ -445,11 +445,12 @@ private Mono processCommit(TransactionSynchronizationManager synchronizati return doCommit(synchronizationManager, status); } return Mono.empty(); - })).then(Mono.empty().onErrorResume(ex -> { - Mono propagateException = Mono.error(ex); + })) // + .onErrorResume(ex -> { + Mono propagateException = Mono.error(ex); // Store result in a local variable in order to appease the // Eclipse compiler with regard to inferred generics. - Mono result = propagateException; + Mono result = propagateException; if (ErrorPredicates.UNEXPECTED_ROLLBACK.test(ex)) { result = triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK) .then(propagateException); @@ -471,7 +472,8 @@ else if (ErrorPredicates.RUNTIME_OR_ERROR.test(ex)) { } return result; - })).then(Mono.defer(() -> triggerAfterCommit(synchronizationManager, status).onErrorResume(ex -> + }) + .then(Mono.defer(() -> triggerAfterCommit(synchronizationManager, status).onErrorResume(ex -> triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_COMMITTED).then(Mono.error(ex))) .then(triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_COMMITTED)))); diff --git a/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTestTransactionManager.java b/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTestTransactionManager.java index 217551cc85ac..e43f341b2c8e 100644 --- a/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTestTransactionManager.java +++ b/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTestTransactionManager.java @@ -36,6 +36,8 @@ class ReactiveTestTransactionManager extends AbstractReactiveTransactionManager private final boolean canCreateTransaction; + private final boolean forceFailOnCommit; + protected boolean begin = false; protected boolean commit = false; @@ -48,8 +50,13 @@ class ReactiveTestTransactionManager extends AbstractReactiveTransactionManager ReactiveTestTransactionManager(boolean existingTransaction, boolean canCreateTransaction) { + this(existingTransaction, canCreateTransaction, false); + } + + ReactiveTestTransactionManager(boolean existingTransaction, boolean canCreateTransaction, boolean forceFailOnCommit) { this.existingTransaction = existingTransaction; this.canCreateTransaction = canCreateTransaction; + this.forceFailOnCommit = forceFailOnCommit; } @@ -79,7 +86,12 @@ protected Mono doCommit(TransactionSynchronizationManager synchronizationM if (!TRANSACTION.equals(status.getTransaction())) { return Mono.error(new IllegalArgumentException("Not the same transaction object")); } - return Mono.fromRunnable(() -> this.commit = true); + return Mono.fromRunnable(() -> { + this.commit = true; + if (this.forceFailOnCommit) { + throw new IllegalArgumentException("Forced failure on commit"); + } + }); } @Override diff --git a/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTransactionSupportTests.java b/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTransactionSupportTests.java index 4912c96226a0..20db5ea65d62 100644 --- a/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTransactionSupportTests.java +++ b/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTransactionSupportTests.java @@ -203,6 +203,22 @@ public void transactionTemplateWithException() { assertHasCleanedUp(tm); } + //gh-28968 + @Test + void errorInCommitDoesInitiateRollbackAfterCommit() { + ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true, true); + TransactionalOperator rxtx = TransactionalOperator.create(tm); + + StepVerifier.create(rxtx.transactional(Mono.just("bar"))) + .verifyErrorMessage("Forced failure on commit"); + + assertHasBegan(tm); + assertHasCommitted(tm); + assertHasRolledBack(tm); + assertHasNotSetRollbackOnly(tm); + assertHasCleanedUp(tm); + } + private void assertHasBegan(ReactiveTestTransactionManager actual) { assertThat(actual.begin).as("Expected but was was not invoked").isTrue(); }