Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid rollback after a commit failure in TransactionalOperator #27572

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ void testCommitFails() {
.doOnNext(connection -> connection.createStatement("foo")).then()
.as(operator::transactional)
.as(StepVerifier::create)
.verifyError(IllegalTransactionStateException.class);
.verifyError(BadSqlGrammarException.class);

verify(connectionMock).isAutoCommit();
verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class));
Expand Down Expand Up @@ -317,7 +317,7 @@ void testRollbackFails() {
return ConnectionFactoryUtils.getConnection(connectionFactoryMock)
.doOnNext(connection -> connection.createStatement("foo")).then();
}).as(StepVerifier::create)
.verifyError(IllegalTransactionStateException.class);
.verifyError(BadSqlGrammarException.class);

verify(connectionMock).isAutoCommit();
verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
* @author Sam Brannen
* @author Mark Paluch
* @author Sebastien Deleuze
* @author Enric Sala
* @since 1.1
* @see PlatformTransactionManager
* @see ReactiveTransactionManager
Expand Down Expand Up @@ -919,60 +920,41 @@ public Object invokeWithinTransaction(Method method, @Nullable Class<?> targetCl
!COROUTINES_FLOW_CLASS_NAME.equals(new MethodParameter(method, -1).getParameterType().getName()))) {

return TransactionContextManager.currentContext().flatMap(context ->
createTransactionIfNecessary(rtm, txAttr, joinpointIdentification).flatMap(it -> {
try {
// Need re-wrapping until we get hold of the exception through usingWhen.
return Mono.<Object, ReactiveTransactionInfo>usingWhen(
Mono.just(it),
txInfo -> {
try {
return (Mono<?>) invocation.proceedWithInvocation();
}
catch (Throwable ex) {
return Mono.error(ex);
}
},
this::commitTransactionAfterReturning,
(txInfo, err) -> Mono.empty(),
this::rollbackTransactionOnCancel)
.onErrorResume(ex ->
completeTransactionAfterThrowing(it, ex).then(Mono.error(ex)));
}
catch (Throwable ex) {
// target invocation exception
return completeTransactionAfterThrowing(it, ex).then(Mono.error(ex));
}
})).contextWrite(TransactionContextManager.getOrCreateContext())
Mono.<Object, ReactiveTransactionInfo>usingWhen(
createTransactionIfNecessary(rtm, txAttr, joinpointIdentification),
tx -> {
try {
return (Mono<?>) invocation.proceedWithInvocation();
}
catch (Throwable ex) {
return Mono.error(ex);
}
},
this::commitTransactionAfterReturning,
this::completeTransactionAfterThrowing,
this::rollbackTransactionOnCancel)
.onErrorMap(this::unwrapIfResourceCleanupFailure))
.contextWrite(TransactionContextManager.getOrCreateContext())
.contextWrite(TransactionContextManager.getOrCreateContextHolder());
}

// Any other reactive type, typically a Flux
return this.adapter.fromPublisher(TransactionContextManager.currentContext().flatMapMany(context ->
createTransactionIfNecessary(rtm, txAttr, joinpointIdentification).flatMapMany(it -> {
try {
// Need re-wrapping until we get hold of the exception through usingWhen.
return Flux
.usingWhen(
Mono.just(it),
txInfo -> {
try {
return this.adapter.toPublisher(invocation.proceedWithInvocation());
}
catch (Throwable ex) {
return Mono.error(ex);
}
},
this::commitTransactionAfterReturning,
(txInfo, ex) -> Mono.empty(),
this::rollbackTransactionOnCancel)
.onErrorResume(ex ->
completeTransactionAfterThrowing(it, ex).then(Mono.error(ex)));
}
catch (Throwable ex) {
// target invocation exception
return completeTransactionAfterThrowing(it, ex).then(Mono.error(ex));
}
})).contextWrite(TransactionContextManager.getOrCreateContext())
Flux.usingWhen(
createTransactionIfNecessary(rtm, txAttr, joinpointIdentification),
tx -> {
try {
return this.adapter.toPublisher(invocation.proceedWithInvocation());
}
catch (Throwable ex) {
return Mono.error(ex);
}
},
this::commitTransactionAfterReturning,
this::completeTransactionAfterThrowing,
this::rollbackTransactionOnCancel)
.onErrorMap(this::unwrapIfResourceCleanupFailure))
.contextWrite(TransactionContextManager.getOrCreateContext())
.contextWrite(TransactionContextManager.getOrCreateContextHolder()));
}

Expand Down Expand Up @@ -1053,6 +1035,9 @@ private Mono<Void> completeTransactionAfterThrowing(@Nullable ReactiveTransactio
if (ex2 instanceof TransactionSystemException systemException) {
systemException.initApplicationException(ex);
}
else {
ex2.addSuppressed(ex);
}
return ex2;
}
);
Expand All @@ -1065,13 +1050,30 @@ private Mono<Void> completeTransactionAfterThrowing(@Nullable ReactiveTransactio
if (ex2 instanceof TransactionSystemException systemException) {
systemException.initApplicationException(ex);
}
else {
ex2.addSuppressed(ex);
}
return ex2;
}
);
}
}
return Mono.empty();
}

/**
* Unwrap the cause of a throwable, if produced by a failure
* during the async resource cleanup in {@link Flux#usingWhen}.
* @param ex the throwable to try to unwrap
*/
private Throwable unwrapIfResourceCleanupFailure(Throwable ex) {
if (ex instanceof RuntimeException &&
ex.getCause() != null &&
ex.getMessage().startsWith("Async resource cleanup failed")) {
return ex.getCause();
}
return ex;
}
}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -45,6 +45,7 @@
*
* @author Mark Paluch
* @author Juergen Hoeller
* @author Enric Sala
* @since 5.2
* @see #execute
* @see ReactiveTransactionManager
Expand All @@ -69,7 +70,9 @@ default <T> Flux<T> transactional(Flux<T> flux) {
* @throws TransactionException in case of initialization, rollback, or system errors
* @throws RuntimeException if thrown by the TransactionCallback
*/
<T> Mono<T> transactional(Mono<T> mono);
default <T> Mono<T> transactional(Mono<T> mono) {
return execute(it -> mono).singleOrEmpty();
Comment on lines +73 to +74
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A similar default implementation existed in the past but using .next() instead of .singleOrEmpty(). This was changed in gh-23562 because .next() triggers the cancellation of the source.

I believe we could use .singleOrEmpty() which IIUC should not cancel the source before receiving the completion signal from the Mono.

Please let me know if this exceeds the scope of the issue and I would drop it from the PR.

}

/**
* Execute the action specified by the given callback object within a transaction.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
*
* @author Mark Paluch
* @author Juergen Hoeller
* @author Enric Sala
* @since 5.2
* @see #execute
* @see ReactiveTransactionManager
Expand Down Expand Up @@ -70,40 +71,16 @@ public ReactiveTransactionManager getTransactionManager() {
return this.transactionManager;
}

@Override
public <T> Mono<T> transactional(Mono<T> mono) {
return TransactionContextManager.currentContext().flatMap(context -> {
Mono<ReactiveTransaction> status = this.transactionManager.getReactiveTransaction(this.transactionDefinition);
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
// Need re-wrapping of ReactiveTransaction until we get hold of the exception
// through usingWhen.
return status.flatMap(it -> Mono.usingWhen(Mono.just(it), ignore -> mono,
this.transactionManager::commit, (res, err) -> Mono.empty(), this.transactionManager::rollback)
.onErrorResume(ex -> rollbackOnException(it, ex).then(Mono.error(ex))));
})
.contextWrite(TransactionContextManager.getOrCreateContext())
.contextWrite(TransactionContextManager.getOrCreateContextHolder());
}

@Override
public <T> Flux<T> execute(TransactionCallback<T> action) throws TransactionException {
return TransactionContextManager.currentContext().flatMapMany(context -> {
Mono<ReactiveTransaction> status = this.transactionManager.getReactiveTransaction(this.transactionDefinition);
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
// Need re-wrapping of ReactiveTransaction until we get hold of the exception
// through usingWhen.
return status.flatMapMany(it -> Flux
.usingWhen(
Mono.just(it),
Comment on lines -93 to -99
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was hesitant about filing a PR because I wasn't sure I was fully understanding this part.

IIUC we could now simplify this because the rollbackOnException has been moved to inside the usingWhen.

Relates to this comment: https://github.com/spring-projects/spring-framework/pull/23562/files#r319925489

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous code commits the transaction depending on successful/exceptional completion/cancellation. Any failure in the async cleanup methods (specifically commit because rollback happens upon cancellation) dropped into rollbackOnException which performs another round of cleanup and that is the actual bug.

To solve the issue, we need to call rollbackOnException in the transaction cleanup instead of onErrorResume(…) and (tx, ex) -> Mono.empty(). That would align the flow with the imperative TransactionTemplate.

It is possible that we need to refine the actual exception if it is emitted by the async clean up afterward.

Note that the same issue exists in TransactionAspectSupport.ReactiveTransactionSupport.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback @mp911de :) Do you mean something like this?

Flux.usingWhen(
	this.transactionManager.getReactiveTransaction(this.transactionDefinition),
	action::doInTransaction,
	this.transactionManager::commit,
	this::rollbackOnException,
	this.transactionManager::rollback))

Was indeed going for something like this initially but as you mentioned it would require refining the exception, because a failure on either asyncComplete or asyncError wraps the exception like this:

java.lang.RuntimeException: Async resource cleanup failed after onComplete|onError

Maybe that's ok? But that's why I thought that the .onErrorResume(rollback) + .concatWith(commit) combination may achieve something similar and avoid the exception refining. Do you think it would be preferable to go with the asyncComplete&asyncError + exception cleanup approach?

Note that the same issue exists in TransactionAspectSupport.ReactiveTransactionSupport.

Good point! Shall I apply these changes also over there, or do you think separate PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That bit looks decent. The more operators we use the more performance impact we can generate. Regarding exception mapping, I'm not sure which top-level exception should be propagated downstream. Looking into the transaction manager, a RuntimeException seems fine. Maybe @jhoeller can provide a bit more guidance.

Shall I apply these changes also over there, or do you think separate PR?

The issue is the same and it makes sense to fix the same problem in multiple places at once as we can keep the context within one commit from your side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed the changes. Went ahead with a message-based approach to the unwrapping, maybe there is a utility for this? 🤔 Didn't find anything in reactor.core.Exceptions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Paging @simonbasle, maybe Simon can give further insights.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there isn't such an unwrapping utility currently, not for the sake of detecting that particular kind of exception.

@EnricSala note that in the case of a rollback which fails, we have a RuntimeException with:

  1. the cause of the rollback failure as the getCause()
  2. the original exception in the usingWhen which triggered the rollback in the first place (the rollbackCause) as a suppressed exception

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reply! Agree, the exception propagated by usingWhen meets the criteria so leaving it untouched would be an option 👍

I squashed the changes on the branch leaving only two commits: the first one shows the implementation with exception unwrapping and the second commit shows what it would look like if we drop the unwrapping.

Both options resolve my problem, so I think at this point it may be a matter of preference or consistency with the rest of the framework. Please let me know which option would be preferable :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense to unwrap the exception as callers typically do not expect a generic RuntimeException.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also have a preference for this option because the RuntimeException feels a bit synthetic, it's only there due to the usingWhen implementation, and it doesn't align with the behavior of non-reactive transactions.

I have applied the change, it should be ready for review :)

action::doInTransaction,
this.transactionManager::commit,
(tx, ex) -> Mono.empty(),
this.transactionManager::rollback)
.onErrorResume(ex ->
rollbackOnException(it, ex).then(Mono.error(ex))));
})
return TransactionContextManager.currentContext().flatMapMany(context ->
Flux.usingWhen(
this.transactionManager.getReactiveTransaction(this.transactionDefinition),
action::doInTransaction,
this.transactionManager::commit,
this::rollbackOnException,
this.transactionManager::rollback)
.onErrorMap(this::unwrapIfResourceCleanupFailure))
.contextWrite(TransactionContextManager.getOrCreateContext())
.contextWrite(TransactionContextManager.getOrCreateContextHolder());
}
Expand All @@ -121,11 +98,28 @@ private Mono<Void> rollbackOnException(ReactiveTransaction status, Throwable ex)
if (ex2 instanceof TransactionSystemException tse) {
tse.initApplicationException(ex);
}
else {
ex2.addSuppressed(ex);
}
return ex2;
}
);
}

/**
* Unwrap the cause of a throwable, if produced by a failure
* during the async resource cleanup in {@link Flux#usingWhen}.
* @param ex the throwable to try to unwrap
*/
private Throwable unwrapIfResourceCleanupFailure(Throwable ex) {
if (ex instanceof RuntimeException &&
ex.getCause() != null &&
ex.getMessage().startsWith("Async resource cleanup failed")) {
return ex.getCause();
}
return ex;
}


@Override
public boolean equals(@Nullable Object other) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,11 +335,7 @@ public void cannotCommitTransaction() throws Exception {

Mono.from(itb.setName(name))
.as(StepVerifier::create)
.consumeErrorWith(throwable -> {
assertThat(throwable.getClass()).isEqualTo(RuntimeException.class);
assertThat(throwable.getCause()).isEqualTo(ex);
})
.verify();
.verifyErrorSatisfies(actual -> assertThat(actual).isEqualTo(ex));

// Should have invoked target and changed name

Expand Down
Loading