Skip to content

Commit

Permalink
Avoid rollback after a commit failure in TransactionalOperator
Browse files Browse the repository at this point in the history
A failure to commit a reactive transaction will complete the
transaction and clean up resources. Executing a rollback at
that point is invalid, which causes an
IllegalTransactionStateException that masks the cause of the
commit failure.

This change restructures TransactionalOperatorImpl and
ReactiveTransactionSupport to avoid executing a rollback after
a failed commit. While there, the Mono transaction handling in
TransactionalOperator is simplified by moving it to a default
method on the interface.

Closes gh-27572
  • Loading branch information
EnricSala authored and sdeleuze committed Mar 6, 2023
1 parent 9548101 commit edf0ae7
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ void testCommitFails() {
.doOnNext(connection -> connection.createStatement("foo")).then()
.as(operator::transactional)
.as(StepVerifier::create)
.verifyError(IllegalTransactionStateException.class);
.verifyError(BadSqlGrammarException.class);

verify(connectionMock).isAutoCommit();
verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class));
Expand Down Expand Up @@ -317,7 +317,7 @@ void testRollbackFails() {
return ConnectionFactoryUtils.getConnection(connectionFactoryMock)
.doOnNext(connection -> connection.createStatement("foo")).then();
}).as(StepVerifier::create)
.verifyError(IllegalTransactionStateException.class);
.verifyError(BadSqlGrammarException.class);

verify(connectionMock).isAutoCommit();
verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
* @author Sam Brannen
* @author Mark Paluch
* @author Sebastien Deleuze
* @author Enric Sala
* @since 1.1
* @see PlatformTransactionManager
* @see ReactiveTransactionManager
Expand Down Expand Up @@ -919,60 +920,41 @@ public Object invokeWithinTransaction(Method method, @Nullable Class<?> targetCl
!COROUTINES_FLOW_CLASS_NAME.equals(new MethodParameter(method, -1).getParameterType().getName()))) {

return TransactionContextManager.currentContext().flatMap(context ->
createTransactionIfNecessary(rtm, txAttr, joinpointIdentification).flatMap(it -> {
try {
// Need re-wrapping until we get hold of the exception through usingWhen.
return Mono.<Object, ReactiveTransactionInfo>usingWhen(
Mono.just(it),
txInfo -> {
try {
return (Mono<?>) invocation.proceedWithInvocation();
}
catch (Throwable ex) {
return Mono.error(ex);
}
},
this::commitTransactionAfterReturning,
(txInfo, err) -> Mono.empty(),
this::rollbackTransactionOnCancel)
.onErrorResume(ex ->
completeTransactionAfterThrowing(it, ex).then(Mono.error(ex)));
}
catch (Throwable ex) {
// target invocation exception
return completeTransactionAfterThrowing(it, ex).then(Mono.error(ex));
}
})).contextWrite(TransactionContextManager.getOrCreateContext())
Mono.<Object, ReactiveTransactionInfo>usingWhen(
createTransactionIfNecessary(rtm, txAttr, joinpointIdentification),
tx -> {
try {
return (Mono<?>) invocation.proceedWithInvocation();
}
catch (Throwable ex) {
return Mono.error(ex);
}
},
this::commitTransactionAfterReturning,
this::completeTransactionAfterThrowing,
this::rollbackTransactionOnCancel)
.onErrorMap(this::unwrapIfResourceCleanupFailure))
.contextWrite(TransactionContextManager.getOrCreateContext())
.contextWrite(TransactionContextManager.getOrCreateContextHolder());
}

// Any other reactive type, typically a Flux
return this.adapter.fromPublisher(TransactionContextManager.currentContext().flatMapMany(context ->
createTransactionIfNecessary(rtm, txAttr, joinpointIdentification).flatMapMany(it -> {
try {
// Need re-wrapping until we get hold of the exception through usingWhen.
return Flux
.usingWhen(
Mono.just(it),
txInfo -> {
try {
return this.adapter.toPublisher(invocation.proceedWithInvocation());
}
catch (Throwable ex) {
return Mono.error(ex);
}
},
this::commitTransactionAfterReturning,
(txInfo, ex) -> Mono.empty(),
this::rollbackTransactionOnCancel)
.onErrorResume(ex ->
completeTransactionAfterThrowing(it, ex).then(Mono.error(ex)));
}
catch (Throwable ex) {
// target invocation exception
return completeTransactionAfterThrowing(it, ex).then(Mono.error(ex));
}
})).contextWrite(TransactionContextManager.getOrCreateContext())
Flux.usingWhen(
createTransactionIfNecessary(rtm, txAttr, joinpointIdentification),
tx -> {
try {
return this.adapter.toPublisher(invocation.proceedWithInvocation());
}
catch (Throwable ex) {
return Mono.error(ex);
}
},
this::commitTransactionAfterReturning,
this::completeTransactionAfterThrowing,
this::rollbackTransactionOnCancel)
.onErrorMap(this::unwrapIfResourceCleanupFailure))
.contextWrite(TransactionContextManager.getOrCreateContext())
.contextWrite(TransactionContextManager.getOrCreateContextHolder()));
}

Expand Down Expand Up @@ -1053,6 +1035,9 @@ private Mono<Void> completeTransactionAfterThrowing(@Nullable ReactiveTransactio
if (ex2 instanceof TransactionSystemException systemException) {
systemException.initApplicationException(ex);
}
else {
ex2.addSuppressed(ex);
}
return ex2;
}
);
Expand All @@ -1065,13 +1050,30 @@ private Mono<Void> completeTransactionAfterThrowing(@Nullable ReactiveTransactio
if (ex2 instanceof TransactionSystemException systemException) {
systemException.initApplicationException(ex);
}
else {
ex2.addSuppressed(ex);
}
return ex2;
}
);
}
}
return Mono.empty();
}

/**
* Unwrap the cause of a throwable, if produced by a failure
* during the async resource cleanup in {@link Flux#usingWhen}.
* @param ex the throwable to try to unwrap
*/
private Throwable unwrapIfResourceCleanupFailure(Throwable ex) {
if (ex instanceof RuntimeException &&
ex.getCause() != null &&
ex.getMessage().startsWith("Async resource cleanup failed")) {
return ex.getCause();
}
return ex;
}
}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -45,6 +45,7 @@
*
* @author Mark Paluch
* @author Juergen Hoeller
* @author Enric Sala
* @since 5.2
* @see #execute
* @see ReactiveTransactionManager
Expand All @@ -69,7 +70,9 @@ default <T> Flux<T> transactional(Flux<T> flux) {
* @throws TransactionException in case of initialization, rollback, or system errors
* @throws RuntimeException if thrown by the TransactionCallback
*/
<T> Mono<T> transactional(Mono<T> mono);
default <T> Mono<T> transactional(Mono<T> mono) {
return execute(it -> mono).singleOrEmpty();
}

/**
* Execute the action specified by the given callback object within a transaction.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
*
* @author Mark Paluch
* @author Juergen Hoeller
* @author Enric Sala
* @since 5.2
* @see #execute
* @see ReactiveTransactionManager
Expand Down Expand Up @@ -70,40 +71,16 @@ public ReactiveTransactionManager getTransactionManager() {
return this.transactionManager;
}

@Override
public <T> Mono<T> transactional(Mono<T> mono) {
return TransactionContextManager.currentContext().flatMap(context -> {
Mono<ReactiveTransaction> status = this.transactionManager.getReactiveTransaction(this.transactionDefinition);
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
// Need re-wrapping of ReactiveTransaction until we get hold of the exception
// through usingWhen.
return status.flatMap(it -> Mono.usingWhen(Mono.just(it), ignore -> mono,
this.transactionManager::commit, (res, err) -> Mono.empty(), this.transactionManager::rollback)
.onErrorResume(ex -> rollbackOnException(it, ex).then(Mono.error(ex))));
})
.contextWrite(TransactionContextManager.getOrCreateContext())
.contextWrite(TransactionContextManager.getOrCreateContextHolder());
}

@Override
public <T> Flux<T> execute(TransactionCallback<T> action) throws TransactionException {
return TransactionContextManager.currentContext().flatMapMany(context -> {
Mono<ReactiveTransaction> status = this.transactionManager.getReactiveTransaction(this.transactionDefinition);
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
// Need re-wrapping of ReactiveTransaction until we get hold of the exception
// through usingWhen.
return status.flatMapMany(it -> Flux
.usingWhen(
Mono.just(it),
action::doInTransaction,
this.transactionManager::commit,
(tx, ex) -> Mono.empty(),
this.transactionManager::rollback)
.onErrorResume(ex ->
rollbackOnException(it, ex).then(Mono.error(ex))));
})
return TransactionContextManager.currentContext().flatMapMany(context ->
Flux.usingWhen(
this.transactionManager.getReactiveTransaction(this.transactionDefinition),
action::doInTransaction,
this.transactionManager::commit,
this::rollbackOnException,
this.transactionManager::rollback)
.onErrorMap(this::unwrapIfResourceCleanupFailure))
.contextWrite(TransactionContextManager.getOrCreateContext())
.contextWrite(TransactionContextManager.getOrCreateContextHolder());
}
Expand All @@ -121,11 +98,28 @@ private Mono<Void> rollbackOnException(ReactiveTransaction status, Throwable ex)
if (ex2 instanceof TransactionSystemException tse) {
tse.initApplicationException(ex);
}
else {
ex2.addSuppressed(ex);
}
return ex2;
}
);
}

/**
* Unwrap the cause of a throwable, if produced by a failure
* during the async resource cleanup in {@link Flux#usingWhen}.
* @param ex the throwable to try to unwrap
*/
private Throwable unwrapIfResourceCleanupFailure(Throwable ex) {
if (ex instanceof RuntimeException &&
ex.getCause() != null &&
ex.getMessage().startsWith("Async resource cleanup failed")) {
return ex.getCause();
}
return ex;
}


@Override
public boolean equals(@Nullable Object other) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,11 +335,7 @@ public void cannotCommitTransaction() throws Exception {

Mono.from(itb.setName(name))
.as(StepVerifier::create)
.consumeErrorWith(throwable -> {
assertThat(throwable.getClass()).isEqualTo(RuntimeException.class);
assertThat(throwable.getCause()).isEqualTo(ex);
})
.verify();
.verifyErrorSatisfies(actual -> assertThat(actual).isEqualTo(ex));

// Should have invoked target and changed name

Expand Down
Loading

0 comments on commit edf0ae7

Please sign in to comment.