Skip to content

Commit

Permalink
Ensure reactive transaction rollback on commit error
Browse files Browse the repository at this point in the history
This change fixes a situation where error handling was skipped during
`processCommit()` in case the `doCommit()` failed. The error handling
was set up via an `onErrorResume` operator that was nested inside a
`then(...)`, applied to an inner `Mono.empty()`. As a consequence,
it would never receive an error signal (effectively decoupling the
onErrorResume from the main chain).

This change simply moves the error handling back one level up. It also
simplifies the `doCommit` code a bit by getting rid of the steps that
artificially introduce a `Mono<Object>` return type, which is not really
needed.

A pre-existing test was missing the fact that the rollback didn't occur,
which is now fixed. Another dedicated test is introduced building upon
the `ReactiveTestTransactionManager` class.

Closes gh-30096
  • Loading branch information
simonbasle authored and sdeleuze committed Mar 10, 2023
1 parent 2e5d047 commit 9b50c0d
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ void testCommitFails() {
verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class));
verify(connectionMock).createStatement("foo");
verify(connectionMock).commitTransaction();
verify(connectionMock).rollbackTransaction();
verify(connectionMock).close();
verifyNoMoreInteractions(connectionMock);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ private Mono<Void> processCommit(TransactionSynchronizationManager synchronizati

AtomicBoolean beforeCompletionInvoked = new AtomicBoolean();

Mono<Object> commit = prepareForCommit(synchronizationManager, status)
Mono<Void> commit = prepareForCommit(synchronizationManager, status)
.then(triggerBeforeCommit(synchronizationManager, status))
.then(triggerBeforeCompletion(synchronizationManager, status))
.then(Mono.defer(() -> {
Expand All @@ -445,11 +445,12 @@ private Mono<Void> processCommit(TransactionSynchronizationManager synchronizati
return doCommit(synchronizationManager, status);
}
return Mono.empty();
})).then(Mono.empty().onErrorResume(ex -> {
Mono<Object> propagateException = Mono.error(ex);
})) //
.onErrorResume(ex -> {
Mono<Void> propagateException = Mono.error(ex);
// Store result in a local variable in order to appease the
// Eclipse compiler with regard to inferred generics.
Mono<Object> result = propagateException;
Mono<Void> result = propagateException;
if (ErrorPredicates.UNEXPECTED_ROLLBACK.test(ex)) {
result = triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK)
.then(propagateException);
Expand All @@ -471,7 +472,8 @@ else if (ErrorPredicates.RUNTIME_OR_ERROR.test(ex)) {
}

return result;
})).then(Mono.defer(() -> triggerAfterCommit(synchronizationManager, status).onErrorResume(ex ->
})
.then(Mono.defer(() -> triggerAfterCommit(synchronizationManager, status).onErrorResume(ex ->
triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_COMMITTED).then(Mono.error(ex)))
.then(triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_COMMITTED))));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class ReactiveTestTransactionManager extends AbstractReactiveTransactionManager

private final boolean canCreateTransaction;

private final boolean forceFailOnCommit;

protected boolean begin = false;

protected boolean commit = false;
Expand All @@ -48,8 +50,13 @@ class ReactiveTestTransactionManager extends AbstractReactiveTransactionManager


ReactiveTestTransactionManager(boolean existingTransaction, boolean canCreateTransaction) {
this(existingTransaction, canCreateTransaction, false);
}

ReactiveTestTransactionManager(boolean existingTransaction, boolean canCreateTransaction, boolean forceFailOnCommit) {
this.existingTransaction = existingTransaction;
this.canCreateTransaction = canCreateTransaction;
this.forceFailOnCommit = forceFailOnCommit;
}


Expand Down Expand Up @@ -79,7 +86,12 @@ protected Mono<Void> doCommit(TransactionSynchronizationManager synchronizationM
if (!TRANSACTION.equals(status.getTransaction())) {
return Mono.error(new IllegalArgumentException("Not the same transaction object"));
}
return Mono.fromRunnable(() -> this.commit = true);
return Mono.fromRunnable(() -> {
this.commit = true;
if (this.forceFailOnCommit) {
throw new IllegalArgumentException("Forced failure on commit");
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,22 @@ public void transactionTemplateWithException() {
assertHasCleanedUp(tm);
}

//gh-28968
@Test
void errorInCommitDoesInitiateRollbackAfterCommit() {
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true, true);
TransactionalOperator rxtx = TransactionalOperator.create(tm);

StepVerifier.create(rxtx.transactional(Mono.just("bar")))
.verifyErrorMessage("Forced failure on commit");

assertHasBegan(tm);
assertHasCommitted(tm);
assertHasRolledBack(tm);
assertHasNotSetRollbackOnly(tm);
assertHasCleanedUp(tm);
}

private void assertHasBegan(ReactiveTestTransactionManager actual) {
assertThat(actual.begin).as("Expected <ReactiveTransactionManager.begin()> but was <begin()> was not invoked").isTrue();
}
Expand Down

0 comments on commit 9b50c0d

Please sign in to comment.