diff --git a/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java b/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java index 62858154e03e..870de048e88b 100644 --- a/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java +++ b/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java @@ -264,7 +264,7 @@ void testCommitFails() { .doOnNext(connection -> connection.createStatement("foo")).then() .as(operator::transactional) .as(StepVerifier::create) - .verifyError(IllegalTransactionStateException.class); + .verifyError(BadSqlGrammarException.class); verify(connectionMock).isAutoCommit(); verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class)); @@ -317,7 +317,7 @@ void testRollbackFails() { return ConnectionFactoryUtils.getConnection(connectionFactoryMock) .doOnNext(connection -> connection.createStatement("foo")).then(); }).as(StepVerifier::create) - .verifyError(IllegalTransactionStateException.class); + .verifyError(BadSqlGrammarException.class); verify(connectionMock).isAutoCommit(); verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class)); diff --git a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java index eaa853053547..64c3d3a4fd79 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java +++ b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java @@ -84,6 +84,7 @@ * @author Sam Brannen * @author Mark Paluch * @author Sebastien Deleuze + * @author Enric Sala * @since 1.1 * @see PlatformTransactionManager * @see ReactiveTransactionManager @@ -919,60 +920,41 @@ public Object invokeWithinTransaction(Method method, @Nullable Class targetCl !COROUTINES_FLOW_CLASS_NAME.equals(new MethodParameter(method, -1).getParameterType().getName()))) { return TransactionContextManager.currentContext().flatMap(context -> - createTransactionIfNecessary(rtm, txAttr, joinpointIdentification).flatMap(it -> { - try { - // Need re-wrapping until we get hold of the exception through usingWhen. - return Mono.usingWhen( - Mono.just(it), - txInfo -> { - try { - return (Mono) invocation.proceedWithInvocation(); - } - catch (Throwable ex) { - return Mono.error(ex); - } - }, - this::commitTransactionAfterReturning, - (txInfo, err) -> Mono.empty(), - this::rollbackTransactionOnCancel) - .onErrorResume(ex -> - completeTransactionAfterThrowing(it, ex).then(Mono.error(ex))); - } - catch (Throwable ex) { - // target invocation exception - return completeTransactionAfterThrowing(it, ex).then(Mono.error(ex)); - } - })).contextWrite(TransactionContextManager.getOrCreateContext()) + Mono.usingWhen( + createTransactionIfNecessary(rtm, txAttr, joinpointIdentification), + tx -> { + try { + return (Mono) invocation.proceedWithInvocation(); + } + catch (Throwable ex) { + return Mono.error(ex); + } + }, + this::commitTransactionAfterReturning, + this::completeTransactionAfterThrowing, + this::rollbackTransactionOnCancel) + .onErrorMap(this::unwrapIfResourceCleanupFailure)) + .contextWrite(TransactionContextManager.getOrCreateContext()) .contextWrite(TransactionContextManager.getOrCreateContextHolder()); } // Any other reactive type, typically a Flux return this.adapter.fromPublisher(TransactionContextManager.currentContext().flatMapMany(context -> - createTransactionIfNecessary(rtm, txAttr, joinpointIdentification).flatMapMany(it -> { - try { - // Need re-wrapping until we get hold of the exception through usingWhen. - return Flux - .usingWhen( - Mono.just(it), - txInfo -> { - try { - return this.adapter.toPublisher(invocation.proceedWithInvocation()); - } - catch (Throwable ex) { - return Mono.error(ex); - } - }, - this::commitTransactionAfterReturning, - (txInfo, ex) -> Mono.empty(), - this::rollbackTransactionOnCancel) - .onErrorResume(ex -> - completeTransactionAfterThrowing(it, ex).then(Mono.error(ex))); - } - catch (Throwable ex) { - // target invocation exception - return completeTransactionAfterThrowing(it, ex).then(Mono.error(ex)); - } - })).contextWrite(TransactionContextManager.getOrCreateContext()) + Flux.usingWhen( + createTransactionIfNecessary(rtm, txAttr, joinpointIdentification), + tx -> { + try { + return this.adapter.toPublisher(invocation.proceedWithInvocation()); + } + catch (Throwable ex) { + return Mono.error(ex); + } + }, + this::commitTransactionAfterReturning, + this::completeTransactionAfterThrowing, + this::rollbackTransactionOnCancel) + .onErrorMap(this::unwrapIfResourceCleanupFailure)) + .contextWrite(TransactionContextManager.getOrCreateContext()) .contextWrite(TransactionContextManager.getOrCreateContextHolder())); } @@ -1053,6 +1035,9 @@ private Mono completeTransactionAfterThrowing(@Nullable ReactiveTransactio if (ex2 instanceof TransactionSystemException systemException) { systemException.initApplicationException(ex); } + else { + ex2.addSuppressed(ex); + } return ex2; } ); @@ -1065,6 +1050,9 @@ private Mono completeTransactionAfterThrowing(@Nullable ReactiveTransactio if (ex2 instanceof TransactionSystemException systemException) { systemException.initApplicationException(ex); } + else { + ex2.addSuppressed(ex); + } return ex2; } ); @@ -1072,6 +1060,20 @@ private Mono completeTransactionAfterThrowing(@Nullable ReactiveTransactio } return Mono.empty(); } + + /** + * Unwrap the cause of a throwable, if produced by a failure + * during the async resource cleanup in {@link Flux#usingWhen}. + * @param ex the throwable to try to unwrap + */ + private Throwable unwrapIfResourceCleanupFailure(Throwable ex) { + if (ex instanceof RuntimeException && + ex.getCause() != null && + ex.getMessage().startsWith("Async resource cleanup failed")) { + return ex.getCause(); + } + return ex; + } } diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperator.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperator.java index 46afd7a3038a..9fdc0b99dc88 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperator.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperator.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -45,6 +45,7 @@ * * @author Mark Paluch * @author Juergen Hoeller + * @author Enric Sala * @since 5.2 * @see #execute * @see ReactiveTransactionManager @@ -69,7 +70,9 @@ default Flux transactional(Flux flux) { * @throws TransactionException in case of initialization, rollback, or system errors * @throws RuntimeException if thrown by the TransactionCallback */ - Mono transactional(Mono mono); + default Mono transactional(Mono mono) { + return execute(it -> mono).singleOrEmpty(); + } /** * Execute the action specified by the given callback object within a transaction. diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java index 09733a83a57d..e40057c22d51 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java @@ -35,6 +35,7 @@ * * @author Mark Paluch * @author Juergen Hoeller + * @author Enric Sala * @since 5.2 * @see #execute * @see ReactiveTransactionManager @@ -70,40 +71,16 @@ public ReactiveTransactionManager getTransactionManager() { return this.transactionManager; } - @Override - public Mono transactional(Mono mono) { - return TransactionContextManager.currentContext().flatMap(context -> { - Mono status = this.transactionManager.getReactiveTransaction(this.transactionDefinition); - // This is an around advice: Invoke the next interceptor in the chain. - // This will normally result in a target object being invoked. - // Need re-wrapping of ReactiveTransaction until we get hold of the exception - // through usingWhen. - return status.flatMap(it -> Mono.usingWhen(Mono.just(it), ignore -> mono, - this.transactionManager::commit, (res, err) -> Mono.empty(), this.transactionManager::rollback) - .onErrorResume(ex -> rollbackOnException(it, ex).then(Mono.error(ex)))); - }) - .contextWrite(TransactionContextManager.getOrCreateContext()) - .contextWrite(TransactionContextManager.getOrCreateContextHolder()); - } - @Override public Flux execute(TransactionCallback action) throws TransactionException { - return TransactionContextManager.currentContext().flatMapMany(context -> { - Mono status = this.transactionManager.getReactiveTransaction(this.transactionDefinition); - // This is an around advice: Invoke the next interceptor in the chain. - // This will normally result in a target object being invoked. - // Need re-wrapping of ReactiveTransaction until we get hold of the exception - // through usingWhen. - return status.flatMapMany(it -> Flux - .usingWhen( - Mono.just(it), - action::doInTransaction, - this.transactionManager::commit, - (tx, ex) -> Mono.empty(), - this.transactionManager::rollback) - .onErrorResume(ex -> - rollbackOnException(it, ex).then(Mono.error(ex)))); - }) + return TransactionContextManager.currentContext().flatMapMany(context -> + Flux.usingWhen( + this.transactionManager.getReactiveTransaction(this.transactionDefinition), + action::doInTransaction, + this.transactionManager::commit, + this::rollbackOnException, + this.transactionManager::rollback) + .onErrorMap(this::unwrapIfResourceCleanupFailure)) .contextWrite(TransactionContextManager.getOrCreateContext()) .contextWrite(TransactionContextManager.getOrCreateContextHolder()); } @@ -121,11 +98,28 @@ private Mono rollbackOnException(ReactiveTransaction status, Throwable ex) if (ex2 instanceof TransactionSystemException tse) { tse.initApplicationException(ex); } + else { + ex2.addSuppressed(ex); + } return ex2; } ); } + /** + * Unwrap the cause of a throwable, if produced by a failure + * during the async resource cleanup in {@link Flux#usingWhen}. + * @param ex the throwable to try to unwrap + */ + private Throwable unwrapIfResourceCleanupFailure(Throwable ex) { + if (ex instanceof RuntimeException && + ex.getCause() != null && + ex.getMessage().startsWith("Async resource cleanup failed")) { + return ex.getCause(); + } + return ex; + } + @Override public boolean equals(@Nullable Object other) { diff --git a/spring-tx/src/test/java/org/springframework/transaction/interceptor/AbstractReactiveTransactionAspectTests.java b/spring-tx/src/test/java/org/springframework/transaction/interceptor/AbstractReactiveTransactionAspectTests.java index 09b2f47646b8..987dc38c6880 100644 --- a/spring-tx/src/test/java/org/springframework/transaction/interceptor/AbstractReactiveTransactionAspectTests.java +++ b/spring-tx/src/test/java/org/springframework/transaction/interceptor/AbstractReactiveTransactionAspectTests.java @@ -335,11 +335,7 @@ public void cannotCommitTransaction() throws Exception { Mono.from(itb.setName(name)) .as(StepVerifier::create) - .consumeErrorWith(throwable -> { - assertThat(throwable.getClass()).isEqualTo(RuntimeException.class); - assertThat(throwable.getCause()).isEqualTo(ex); - }) - .verify(); + .verifyErrorSatisfies(actual -> assertThat(actual).isEqualTo(ex)); // Should have invoked target and changed name diff --git a/spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java b/spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java index a0827bcd7910..1381a90e1525 100644 --- a/spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java +++ b/spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,21 +16,29 @@ package org.springframework.transaction.reactive; +import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import reactor.test.publisher.PublisherProbe; +import org.springframework.transaction.ReactiveTransaction; +import org.springframework.transaction.ReactiveTransactionManager; import org.springframework.transaction.support.DefaultTransactionDefinition; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; /** * Tests for {@link TransactionalOperator}. * * @author Mark Paluch + * @author Enric Sala */ public class TransactionalOperatorTests { @@ -99,6 +107,43 @@ public void rollbackWithMono() { assertThat(tm.rollback).isTrue(); } + @Test + public void commitFailureWithMono() { + ReactiveTransactionManager tm = mock(ReactiveTransactionManager.class); + given(tm.getReactiveTransaction(any())).willReturn(Mono.just(mock(ReactiveTransaction.class))); + PublisherProbe commit = PublisherProbe.of(Mono.error(IOException::new)); + given(tm.commit(any())).willReturn(commit.mono()); + PublisherProbe rollback = PublisherProbe.empty(); + given(tm.rollback(any())).willReturn(rollback.mono()); + + TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition()); + Mono.just(true).as(operator::transactional) + .as(StepVerifier::create) + .verifyError(IOException.class); + assertThat(commit.subscribeCount()).isEqualTo(1); + rollback.assertWasNotSubscribed(); + } + + @Test + public void rollbackFailureWithMono() { + ReactiveTransactionManager tm = mock(ReactiveTransactionManager.class); + given(tm.getReactiveTransaction(any())).willReturn(Mono.just(mock(ReactiveTransaction.class))); + PublisherProbe commit = PublisherProbe.empty(); + given(tm.commit(any())).willReturn(commit.mono()); + PublisherProbe rollback = PublisherProbe.of(Mono.error(IOException::new)); + given(tm.rollback(any())).willReturn(rollback.mono()); + + TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition()); + IllegalStateException actionFailure = new IllegalStateException(); + Mono.error(actionFailure).as(operator::transactional) + .as(StepVerifier::create) + .verifyErrorSatisfies(ex -> assertThat(ex) + .isInstanceOf(IOException.class) + .hasSuppressedException(actionFailure)); + commit.assertWasNotSubscribed(); + assertThat(rollback.subscribeCount()).isEqualTo(1); + } + @Test public void commitWithFlux() { TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition()); @@ -120,4 +165,43 @@ public void rollbackWithFlux() { assertThat(tm.rollback).isTrue(); } + @Test + public void commitFailureWithFlux() { + ReactiveTransactionManager tm = mock(ReactiveTransactionManager.class); + given(tm.getReactiveTransaction(any())).willReturn(Mono.just(mock(ReactiveTransaction.class))); + PublisherProbe commit = PublisherProbe.of(Mono.error(IOException::new)); + given(tm.commit(any())).willReturn(commit.mono()); + PublisherProbe rollback = PublisherProbe.empty(); + given(tm.rollback(any())).willReturn(rollback.mono()); + + TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition()); + Flux.just(1, 2, 3, 4).as(operator::transactional) + .as(StepVerifier::create) + .expectNextCount(4) + .verifyError(IOException.class); + assertThat(commit.subscribeCount()).isEqualTo(1); + rollback.assertWasNotSubscribed(); + } + + @Test + public void rollbackFailureWithFlux() { + ReactiveTransactionManager tm = mock(ReactiveTransactionManager.class); + given(tm.getReactiveTransaction(any())).willReturn(Mono.just(mock(ReactiveTransaction.class))); + PublisherProbe commit = PublisherProbe.empty(); + given(tm.commit(any())).willReturn(commit.mono()); + PublisherProbe rollback = PublisherProbe.of(Mono.error(IOException::new)); + given(tm.rollback(any())).willReturn(rollback.mono()); + + TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition()); + IllegalStateException actionFailure = new IllegalStateException(); + Flux.just(1, 2, 3).concatWith(Flux.error(actionFailure)).as(operator::transactional) + .as(StepVerifier::create) + .expectNextCount(3) + .verifyErrorSatisfies(ex -> assertThat(ex) + .isInstanceOf(IOException.class) + .hasSuppressedException(actionFailure)); + commit.assertWasNotSubscribed(); + assertThat(rollback.subscribeCount()).isEqualTo(1); + } + } diff --git a/spring-tx/src/test/kotlin/org/springframework/transaction/interceptor/AbstractCoroutinesTransactionAspectTests.kt b/spring-tx/src/test/kotlin/org/springframework/transaction/interceptor/AbstractCoroutinesTransactionAspectTests.kt index f4c4ba096f47..e141188f5bff 100644 --- a/spring-tx/src/test/kotlin/org/springframework/transaction/interceptor/AbstractCoroutinesTransactionAspectTests.kt +++ b/spring-tx/src/test/kotlin/org/springframework/transaction/interceptor/AbstractCoroutinesTransactionAspectTests.kt @@ -290,9 +290,9 @@ abstract class AbstractCoroutinesTransactionAspectTests { try { itb.setName(name) } - catch (ex: Exception) { - assertThat(ex).isInstanceOf(RuntimeException::class.java) - assertThat(ex.cause).hasMessage(ex.message).isInstanceOf(ex::class.java) + catch (actual: Exception) { + assertThat(actual).isInstanceOf(ex.javaClass) + assertThat(actual).hasMessage(ex.message) } // Should have invoked target and changed name assertThat(itb.getName()).isEqualTo(name)