From 0881f0920a60ab3f756edf2e6f4a781dad169609 Mon Sep 17 00:00:00 2001 From: Enric Sala Date: Mon, 18 Oct 2021 11:35:45 +0200 Subject: [PATCH 1/4] Option 1: unwrap the `RuntimeException` from `usingWhen` --- .../R2dbcTransactionManagerUnitTests.java | 4 +- .../interceptor/TransactionAspectSupport.java | 100 +++++++++--------- .../reactive/TransactionalOperator.java | 7 +- .../reactive/TransactionalOperatorImpl.java | 58 +++++----- ...bstractReactiveTransactionAspectTests.java | 6 +- .../reactive/TransactionalOperatorTests.java | 86 ++++++++++++++- ...bstractCoroutinesTransactionAspectTests.kt | 5 +- 7 files changed, 172 insertions(+), 94 deletions(-) diff --git a/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java b/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java index 62858154e03e..870de048e88b 100644 --- a/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java +++ b/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java @@ -264,7 +264,7 @@ void testCommitFails() { .doOnNext(connection -> connection.createStatement("foo")).then() .as(operator::transactional) .as(StepVerifier::create) - .verifyError(IllegalTransactionStateException.class); + .verifyError(BadSqlGrammarException.class); verify(connectionMock).isAutoCommit(); verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class)); @@ -317,7 +317,7 @@ void testRollbackFails() { return ConnectionFactoryUtils.getConnection(connectionFactoryMock) .doOnNext(connection -> connection.createStatement("foo")).then(); }).as(StepVerifier::create) - .verifyError(IllegalTransactionStateException.class); + .verifyError(BadSqlGrammarException.class); verify(connectionMock).isAutoCommit(); verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class)); diff --git a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java index eaa853053547..64c3d3a4fd79 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java +++ b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java @@ -84,6 +84,7 @@ * @author Sam Brannen * @author Mark Paluch * @author Sebastien Deleuze + * @author Enric Sala * @since 1.1 * @see PlatformTransactionManager * @see ReactiveTransactionManager @@ -919,60 +920,41 @@ public Object invokeWithinTransaction(Method method, @Nullable Class targetCl !COROUTINES_FLOW_CLASS_NAME.equals(new MethodParameter(method, -1).getParameterType().getName()))) { return TransactionContextManager.currentContext().flatMap(context -> - createTransactionIfNecessary(rtm, txAttr, joinpointIdentification).flatMap(it -> { - try { - // Need re-wrapping until we get hold of the exception through usingWhen. - return Mono.usingWhen( - Mono.just(it), - txInfo -> { - try { - return (Mono) invocation.proceedWithInvocation(); - } - catch (Throwable ex) { - return Mono.error(ex); - } - }, - this::commitTransactionAfterReturning, - (txInfo, err) -> Mono.empty(), - this::rollbackTransactionOnCancel) - .onErrorResume(ex -> - completeTransactionAfterThrowing(it, ex).then(Mono.error(ex))); - } - catch (Throwable ex) { - // target invocation exception - return completeTransactionAfterThrowing(it, ex).then(Mono.error(ex)); - } - })).contextWrite(TransactionContextManager.getOrCreateContext()) + Mono.usingWhen( + createTransactionIfNecessary(rtm, txAttr, joinpointIdentification), + tx -> { + try { + return (Mono) invocation.proceedWithInvocation(); + } + catch (Throwable ex) { + return Mono.error(ex); + } + }, + this::commitTransactionAfterReturning, + this::completeTransactionAfterThrowing, + this::rollbackTransactionOnCancel) + .onErrorMap(this::unwrapIfResourceCleanupFailure)) + .contextWrite(TransactionContextManager.getOrCreateContext()) .contextWrite(TransactionContextManager.getOrCreateContextHolder()); } // Any other reactive type, typically a Flux return this.adapter.fromPublisher(TransactionContextManager.currentContext().flatMapMany(context -> - createTransactionIfNecessary(rtm, txAttr, joinpointIdentification).flatMapMany(it -> { - try { - // Need re-wrapping until we get hold of the exception through usingWhen. - return Flux - .usingWhen( - Mono.just(it), - txInfo -> { - try { - return this.adapter.toPublisher(invocation.proceedWithInvocation()); - } - catch (Throwable ex) { - return Mono.error(ex); - } - }, - this::commitTransactionAfterReturning, - (txInfo, ex) -> Mono.empty(), - this::rollbackTransactionOnCancel) - .onErrorResume(ex -> - completeTransactionAfterThrowing(it, ex).then(Mono.error(ex))); - } - catch (Throwable ex) { - // target invocation exception - return completeTransactionAfterThrowing(it, ex).then(Mono.error(ex)); - } - })).contextWrite(TransactionContextManager.getOrCreateContext()) + Flux.usingWhen( + createTransactionIfNecessary(rtm, txAttr, joinpointIdentification), + tx -> { + try { + return this.adapter.toPublisher(invocation.proceedWithInvocation()); + } + catch (Throwable ex) { + return Mono.error(ex); + } + }, + this::commitTransactionAfterReturning, + this::completeTransactionAfterThrowing, + this::rollbackTransactionOnCancel) + .onErrorMap(this::unwrapIfResourceCleanupFailure)) + .contextWrite(TransactionContextManager.getOrCreateContext()) .contextWrite(TransactionContextManager.getOrCreateContextHolder())); } @@ -1053,6 +1035,9 @@ private Mono completeTransactionAfterThrowing(@Nullable ReactiveTransactio if (ex2 instanceof TransactionSystemException systemException) { systemException.initApplicationException(ex); } + else { + ex2.addSuppressed(ex); + } return ex2; } ); @@ -1065,6 +1050,9 @@ private Mono completeTransactionAfterThrowing(@Nullable ReactiveTransactio if (ex2 instanceof TransactionSystemException systemException) { systemException.initApplicationException(ex); } + else { + ex2.addSuppressed(ex); + } return ex2; } ); @@ -1072,6 +1060,20 @@ private Mono completeTransactionAfterThrowing(@Nullable ReactiveTransactio } return Mono.empty(); } + + /** + * Unwrap the cause of a throwable, if produced by a failure + * during the async resource cleanup in {@link Flux#usingWhen}. + * @param ex the throwable to try to unwrap + */ + private Throwable unwrapIfResourceCleanupFailure(Throwable ex) { + if (ex instanceof RuntimeException && + ex.getCause() != null && + ex.getMessage().startsWith("Async resource cleanup failed")) { + return ex.getCause(); + } + return ex; + } } diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperator.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperator.java index 46afd7a3038a..9fdc0b99dc88 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperator.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperator.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -45,6 +45,7 @@ * * @author Mark Paluch * @author Juergen Hoeller + * @author Enric Sala * @since 5.2 * @see #execute * @see ReactiveTransactionManager @@ -69,7 +70,9 @@ default Flux transactional(Flux flux) { * @throws TransactionException in case of initialization, rollback, or system errors * @throws RuntimeException if thrown by the TransactionCallback */ - Mono transactional(Mono mono); + default Mono transactional(Mono mono) { + return execute(it -> mono).singleOrEmpty(); + } /** * Execute the action specified by the given callback object within a transaction. diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java index 09733a83a57d..e40057c22d51 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java @@ -35,6 +35,7 @@ * * @author Mark Paluch * @author Juergen Hoeller + * @author Enric Sala * @since 5.2 * @see #execute * @see ReactiveTransactionManager @@ -70,40 +71,16 @@ public ReactiveTransactionManager getTransactionManager() { return this.transactionManager; } - @Override - public Mono transactional(Mono mono) { - return TransactionContextManager.currentContext().flatMap(context -> { - Mono status = this.transactionManager.getReactiveTransaction(this.transactionDefinition); - // This is an around advice: Invoke the next interceptor in the chain. - // This will normally result in a target object being invoked. - // Need re-wrapping of ReactiveTransaction until we get hold of the exception - // through usingWhen. - return status.flatMap(it -> Mono.usingWhen(Mono.just(it), ignore -> mono, - this.transactionManager::commit, (res, err) -> Mono.empty(), this.transactionManager::rollback) - .onErrorResume(ex -> rollbackOnException(it, ex).then(Mono.error(ex)))); - }) - .contextWrite(TransactionContextManager.getOrCreateContext()) - .contextWrite(TransactionContextManager.getOrCreateContextHolder()); - } - @Override public Flux execute(TransactionCallback action) throws TransactionException { - return TransactionContextManager.currentContext().flatMapMany(context -> { - Mono status = this.transactionManager.getReactiveTransaction(this.transactionDefinition); - // This is an around advice: Invoke the next interceptor in the chain. - // This will normally result in a target object being invoked. - // Need re-wrapping of ReactiveTransaction until we get hold of the exception - // through usingWhen. - return status.flatMapMany(it -> Flux - .usingWhen( - Mono.just(it), - action::doInTransaction, - this.transactionManager::commit, - (tx, ex) -> Mono.empty(), - this.transactionManager::rollback) - .onErrorResume(ex -> - rollbackOnException(it, ex).then(Mono.error(ex)))); - }) + return TransactionContextManager.currentContext().flatMapMany(context -> + Flux.usingWhen( + this.transactionManager.getReactiveTransaction(this.transactionDefinition), + action::doInTransaction, + this.transactionManager::commit, + this::rollbackOnException, + this.transactionManager::rollback) + .onErrorMap(this::unwrapIfResourceCleanupFailure)) .contextWrite(TransactionContextManager.getOrCreateContext()) .contextWrite(TransactionContextManager.getOrCreateContextHolder()); } @@ -121,11 +98,28 @@ private Mono rollbackOnException(ReactiveTransaction status, Throwable ex) if (ex2 instanceof TransactionSystemException tse) { tse.initApplicationException(ex); } + else { + ex2.addSuppressed(ex); + } return ex2; } ); } + /** + * Unwrap the cause of a throwable, if produced by a failure + * during the async resource cleanup in {@link Flux#usingWhen}. + * @param ex the throwable to try to unwrap + */ + private Throwable unwrapIfResourceCleanupFailure(Throwable ex) { + if (ex instanceof RuntimeException && + ex.getCause() != null && + ex.getMessage().startsWith("Async resource cleanup failed")) { + return ex.getCause(); + } + return ex; + } + @Override public boolean equals(@Nullable Object other) { diff --git a/spring-tx/src/test/java/org/springframework/transaction/interceptor/AbstractReactiveTransactionAspectTests.java b/spring-tx/src/test/java/org/springframework/transaction/interceptor/AbstractReactiveTransactionAspectTests.java index 09b2f47646b8..987dc38c6880 100644 --- a/spring-tx/src/test/java/org/springframework/transaction/interceptor/AbstractReactiveTransactionAspectTests.java +++ b/spring-tx/src/test/java/org/springframework/transaction/interceptor/AbstractReactiveTransactionAspectTests.java @@ -335,11 +335,7 @@ public void cannotCommitTransaction() throws Exception { Mono.from(itb.setName(name)) .as(StepVerifier::create) - .consumeErrorWith(throwable -> { - assertThat(throwable.getClass()).isEqualTo(RuntimeException.class); - assertThat(throwable.getCause()).isEqualTo(ex); - }) - .verify(); + .verifyErrorSatisfies(actual -> assertThat(actual).isEqualTo(ex)); // Should have invoked target and changed name diff --git a/spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java b/spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java index a0827bcd7910..1381a90e1525 100644 --- a/spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java +++ b/spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,21 +16,29 @@ package org.springframework.transaction.reactive; +import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import reactor.test.publisher.PublisherProbe; +import org.springframework.transaction.ReactiveTransaction; +import org.springframework.transaction.ReactiveTransactionManager; import org.springframework.transaction.support.DefaultTransactionDefinition; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; /** * Tests for {@link TransactionalOperator}. * * @author Mark Paluch + * @author Enric Sala */ public class TransactionalOperatorTests { @@ -99,6 +107,43 @@ public void rollbackWithMono() { assertThat(tm.rollback).isTrue(); } + @Test + public void commitFailureWithMono() { + ReactiveTransactionManager tm = mock(ReactiveTransactionManager.class); + given(tm.getReactiveTransaction(any())).willReturn(Mono.just(mock(ReactiveTransaction.class))); + PublisherProbe commit = PublisherProbe.of(Mono.error(IOException::new)); + given(tm.commit(any())).willReturn(commit.mono()); + PublisherProbe rollback = PublisherProbe.empty(); + given(tm.rollback(any())).willReturn(rollback.mono()); + + TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition()); + Mono.just(true).as(operator::transactional) + .as(StepVerifier::create) + .verifyError(IOException.class); + assertThat(commit.subscribeCount()).isEqualTo(1); + rollback.assertWasNotSubscribed(); + } + + @Test + public void rollbackFailureWithMono() { + ReactiveTransactionManager tm = mock(ReactiveTransactionManager.class); + given(tm.getReactiveTransaction(any())).willReturn(Mono.just(mock(ReactiveTransaction.class))); + PublisherProbe commit = PublisherProbe.empty(); + given(tm.commit(any())).willReturn(commit.mono()); + PublisherProbe rollback = PublisherProbe.of(Mono.error(IOException::new)); + given(tm.rollback(any())).willReturn(rollback.mono()); + + TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition()); + IllegalStateException actionFailure = new IllegalStateException(); + Mono.error(actionFailure).as(operator::transactional) + .as(StepVerifier::create) + .verifyErrorSatisfies(ex -> assertThat(ex) + .isInstanceOf(IOException.class) + .hasSuppressedException(actionFailure)); + commit.assertWasNotSubscribed(); + assertThat(rollback.subscribeCount()).isEqualTo(1); + } + @Test public void commitWithFlux() { TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition()); @@ -120,4 +165,43 @@ public void rollbackWithFlux() { assertThat(tm.rollback).isTrue(); } + @Test + public void commitFailureWithFlux() { + ReactiveTransactionManager tm = mock(ReactiveTransactionManager.class); + given(tm.getReactiveTransaction(any())).willReturn(Mono.just(mock(ReactiveTransaction.class))); + PublisherProbe commit = PublisherProbe.of(Mono.error(IOException::new)); + given(tm.commit(any())).willReturn(commit.mono()); + PublisherProbe rollback = PublisherProbe.empty(); + given(tm.rollback(any())).willReturn(rollback.mono()); + + TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition()); + Flux.just(1, 2, 3, 4).as(operator::transactional) + .as(StepVerifier::create) + .expectNextCount(4) + .verifyError(IOException.class); + assertThat(commit.subscribeCount()).isEqualTo(1); + rollback.assertWasNotSubscribed(); + } + + @Test + public void rollbackFailureWithFlux() { + ReactiveTransactionManager tm = mock(ReactiveTransactionManager.class); + given(tm.getReactiveTransaction(any())).willReturn(Mono.just(mock(ReactiveTransaction.class))); + PublisherProbe commit = PublisherProbe.empty(); + given(tm.commit(any())).willReturn(commit.mono()); + PublisherProbe rollback = PublisherProbe.of(Mono.error(IOException::new)); + given(tm.rollback(any())).willReturn(rollback.mono()); + + TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition()); + IllegalStateException actionFailure = new IllegalStateException(); + Flux.just(1, 2, 3).concatWith(Flux.error(actionFailure)).as(operator::transactional) + .as(StepVerifier::create) + .expectNextCount(3) + .verifyErrorSatisfies(ex -> assertThat(ex) + .isInstanceOf(IOException.class) + .hasSuppressedException(actionFailure)); + commit.assertWasNotSubscribed(); + assertThat(rollback.subscribeCount()).isEqualTo(1); + } + } diff --git a/spring-tx/src/test/kotlin/org/springframework/transaction/interceptor/AbstractCoroutinesTransactionAspectTests.kt b/spring-tx/src/test/kotlin/org/springframework/transaction/interceptor/AbstractCoroutinesTransactionAspectTests.kt index f4c4ba096f47..6909ce70e993 100644 --- a/spring-tx/src/test/kotlin/org/springframework/transaction/interceptor/AbstractCoroutinesTransactionAspectTests.kt +++ b/spring-tx/src/test/kotlin/org/springframework/transaction/interceptor/AbstractCoroutinesTransactionAspectTests.kt @@ -290,9 +290,8 @@ abstract class AbstractCoroutinesTransactionAspectTests { try { itb.setName(name) } - catch (ex: Exception) { - assertThat(ex).isInstanceOf(RuntimeException::class.java) - assertThat(ex.cause).hasMessage(ex.message).isInstanceOf(ex::class.java) + catch (actual: Exception) { + assertThat(actual).isEqualTo(ex) } // Should have invoked target and changed name assertThat(itb.getName()).isEqualTo(name) From 69fd5d557d068e4912dd980fdfd65837e0bdd858 Mon Sep 17 00:00:00 2001 From: Enric Sala Date: Sat, 23 Oct 2021 15:18:17 +0200 Subject: [PATCH 2/4] Option 2: drop the unwrapping, propagate `RuntimeException` --- .../R2dbcTransactionManagerUnitTests.java | 8 ++++-- .../interceptor/TransactionAspectSupport.java | 26 ++----------------- .../reactive/TransactionalOperatorImpl.java | 20 +------------- ...bstractReactiveTransactionAspectTests.java | 8 ++++-- .../reactive/TransactionalOperatorTests.java | 14 +++++++--- ...bstractCoroutinesTransactionAspectTests.kt | 12 +++++++-- 6 files changed, 35 insertions(+), 53 deletions(-) diff --git a/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java b/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java index 870de048e88b..534f3da13190 100644 --- a/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java +++ b/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java @@ -264,7 +264,9 @@ void testCommitFails() { .doOnNext(connection -> connection.createStatement("foo")).then() .as(operator::transactional) .as(StepVerifier::create) - .verifyError(BadSqlGrammarException.class); + .verifyErrorSatisfies(ex -> assertThat(ex) + .isExactlyInstanceOf(RuntimeException.class) + .hasCauseInstanceOf(BadSqlGrammarException.class)); verify(connectionMock).isAutoCommit(); verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class)); @@ -317,7 +319,9 @@ void testRollbackFails() { return ConnectionFactoryUtils.getConnection(connectionFactoryMock) .doOnNext(connection -> connection.createStatement("foo")).then(); }).as(StepVerifier::create) - .verifyError(BadSqlGrammarException.class); + .verifyErrorSatisfies(ex -> assertThat(ex) + .isExactlyInstanceOf(RuntimeException.class) + .hasCauseInstanceOf(BadSqlGrammarException.class)); verify(connectionMock).isAutoCommit(); verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class)); diff --git a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java index 64c3d3a4fd79..e23a230cdd70 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java +++ b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java @@ -932,8 +932,7 @@ public Object invokeWithinTransaction(Method method, @Nullable Class targetCl }, this::commitTransactionAfterReturning, this::completeTransactionAfterThrowing, - this::rollbackTransactionOnCancel) - .onErrorMap(this::unwrapIfResourceCleanupFailure)) + this::rollbackTransactionOnCancel)) .contextWrite(TransactionContextManager.getOrCreateContext()) .contextWrite(TransactionContextManager.getOrCreateContextHolder()); } @@ -952,8 +951,7 @@ public Object invokeWithinTransaction(Method method, @Nullable Class targetCl }, this::commitTransactionAfterReturning, this::completeTransactionAfterThrowing, - this::rollbackTransactionOnCancel) - .onErrorMap(this::unwrapIfResourceCleanupFailure)) + this::rollbackTransactionOnCancel)) .contextWrite(TransactionContextManager.getOrCreateContext()) .contextWrite(TransactionContextManager.getOrCreateContextHolder())); } @@ -1035,9 +1033,6 @@ private Mono completeTransactionAfterThrowing(@Nullable ReactiveTransactio if (ex2 instanceof TransactionSystemException systemException) { systemException.initApplicationException(ex); } - else { - ex2.addSuppressed(ex); - } return ex2; } ); @@ -1050,9 +1045,6 @@ private Mono completeTransactionAfterThrowing(@Nullable ReactiveTransactio if (ex2 instanceof TransactionSystemException systemException) { systemException.initApplicationException(ex); } - else { - ex2.addSuppressed(ex); - } return ex2; } ); @@ -1060,20 +1052,6 @@ private Mono completeTransactionAfterThrowing(@Nullable ReactiveTransactio } return Mono.empty(); } - - /** - * Unwrap the cause of a throwable, if produced by a failure - * during the async resource cleanup in {@link Flux#usingWhen}. - * @param ex the throwable to try to unwrap - */ - private Throwable unwrapIfResourceCleanupFailure(Throwable ex) { - if (ex instanceof RuntimeException && - ex.getCause() != null && - ex.getMessage().startsWith("Async resource cleanup failed")) { - return ex.getCause(); - } - return ex; - } } diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java index e40057c22d51..f64bd17508eb 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java @@ -79,8 +79,7 @@ public Flux execute(TransactionCallback action) throws TransactionExce action::doInTransaction, this.transactionManager::commit, this::rollbackOnException, - this.transactionManager::rollback) - .onErrorMap(this::unwrapIfResourceCleanupFailure)) + this.transactionManager::rollback)) .contextWrite(TransactionContextManager.getOrCreateContext()) .contextWrite(TransactionContextManager.getOrCreateContextHolder()); } @@ -98,28 +97,11 @@ private Mono rollbackOnException(ReactiveTransaction status, Throwable ex) if (ex2 instanceof TransactionSystemException tse) { tse.initApplicationException(ex); } - else { - ex2.addSuppressed(ex); - } return ex2; } ); } - /** - * Unwrap the cause of a throwable, if produced by a failure - * during the async resource cleanup in {@link Flux#usingWhen}. - * @param ex the throwable to try to unwrap - */ - private Throwable unwrapIfResourceCleanupFailure(Throwable ex) { - if (ex instanceof RuntimeException && - ex.getCause() != null && - ex.getMessage().startsWith("Async resource cleanup failed")) { - return ex.getCause(); - } - return ex; - } - @Override public boolean equals(@Nullable Object other) { diff --git a/spring-tx/src/test/java/org/springframework/transaction/interceptor/AbstractReactiveTransactionAspectTests.java b/spring-tx/src/test/java/org/springframework/transaction/interceptor/AbstractReactiveTransactionAspectTests.java index 987dc38c6880..42c21cf29402 100644 --- a/spring-tx/src/test/java/org/springframework/transaction/interceptor/AbstractReactiveTransactionAspectTests.java +++ b/spring-tx/src/test/java/org/springframework/transaction/interceptor/AbstractReactiveTransactionAspectTests.java @@ -256,7 +256,9 @@ public boolean rollbackOn(Throwable t) { .as(StepVerifier::create) .expectErrorSatisfies(actual -> { if (rollbackException) { - assertThat(actual).isEqualTo(tex); + assertThat(actual) + .isExactlyInstanceOf(RuntimeException.class) + .hasCause(tex); } else { assertThat(actual).isEqualTo(ex); @@ -335,7 +337,9 @@ public void cannotCommitTransaction() throws Exception { Mono.from(itb.setName(name)) .as(StepVerifier::create) - .verifyErrorSatisfies(actual -> assertThat(actual).isEqualTo(ex)); + .verifyErrorSatisfies(actual -> assertThat(actual) + .isExactlyInstanceOf(RuntimeException.class) + .hasCause(ex)); // Should have invoked target and changed name diff --git a/spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java b/spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java index 1381a90e1525..efd07c85df53 100644 --- a/spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java +++ b/spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java @@ -119,7 +119,9 @@ public void commitFailureWithMono() { TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition()); Mono.just(true).as(operator::transactional) .as(StepVerifier::create) - .verifyError(IOException.class); + .verifyErrorSatisfies(ex -> assertThat(ex) + .isExactlyInstanceOf(RuntimeException.class) + .hasCauseInstanceOf(IOException.class)); assertThat(commit.subscribeCount()).isEqualTo(1); rollback.assertWasNotSubscribed(); } @@ -138,7 +140,8 @@ public void rollbackFailureWithMono() { Mono.error(actionFailure).as(operator::transactional) .as(StepVerifier::create) .verifyErrorSatisfies(ex -> assertThat(ex) - .isInstanceOf(IOException.class) + .isExactlyInstanceOf(RuntimeException.class) + .hasCauseInstanceOf(IOException.class) .hasSuppressedException(actionFailure)); commit.assertWasNotSubscribed(); assertThat(rollback.subscribeCount()).isEqualTo(1); @@ -178,7 +181,9 @@ public void commitFailureWithFlux() { Flux.just(1, 2, 3, 4).as(operator::transactional) .as(StepVerifier::create) .expectNextCount(4) - .verifyError(IOException.class); + .verifyErrorSatisfies(ex -> assertThat(ex) + .isExactlyInstanceOf(RuntimeException.class) + .hasCauseInstanceOf(IOException.class)); assertThat(commit.subscribeCount()).isEqualTo(1); rollback.assertWasNotSubscribed(); } @@ -198,7 +203,8 @@ public void rollbackFailureWithFlux() { .as(StepVerifier::create) .expectNextCount(3) .verifyErrorSatisfies(ex -> assertThat(ex) - .isInstanceOf(IOException.class) + .isExactlyInstanceOf(RuntimeException.class) + .hasCauseInstanceOf(IOException.class) .hasSuppressedException(actionFailure)); commit.assertWasNotSubscribed(); assertThat(rollback.subscribeCount()).isEqualTo(1); diff --git a/spring-tx/src/test/kotlin/org/springframework/transaction/interceptor/AbstractCoroutinesTransactionAspectTests.kt b/spring-tx/src/test/kotlin/org/springframework/transaction/interceptor/AbstractCoroutinesTransactionAspectTests.kt index 6909ce70e993..d0e97069fb0e 100644 --- a/spring-tx/src/test/kotlin/org/springframework/transaction/interceptor/AbstractCoroutinesTransactionAspectTests.kt +++ b/spring-tx/src/test/kotlin/org/springframework/transaction/interceptor/AbstractCoroutinesTransactionAspectTests.kt @@ -218,7 +218,11 @@ abstract class AbstractCoroutinesTransactionAspectTests { } catch (actual: Exception) { if (rollbackException) { - assertThat(actual).hasMessage(tex.message).isInstanceOf(tex::class.java) + assertThat(actual) + .isExactlyInstanceOf(RuntimeException::class.java) + .cause + .isExactlyInstanceOf(RuntimeException::class.java) + .hasCause(tex) } else { assertThat(actual).hasMessage(ex.message).isInstanceOf(ex::class.java) } @@ -291,7 +295,11 @@ abstract class AbstractCoroutinesTransactionAspectTests { itb.setName(name) } catch (actual: Exception) { - assertThat(actual).isEqualTo(ex) + assertThat(actual) + .isExactlyInstanceOf(RuntimeException::class.java) + .cause + .isExactlyInstanceOf(RuntimeException::class.java) + .hasCause(ex) } // Should have invoked target and changed name assertThat(itb.getName()).isEqualTo(name) From e7f9cb8e3ed8710880e791b40dfb0f57960d8dfc Mon Sep 17 00:00:00 2001 From: Enric Sala Date: Sun, 31 Oct 2021 13:02:42 +0100 Subject: [PATCH 3/4] Revert "Option 2: drop the unwrapping, propagate `RuntimeException`" This reverts commit aaad05147b3c6fb820ab6a095454aee1f9c70f12. --- .../R2dbcTransactionManagerUnitTests.java | 8 ++---- .../interceptor/TransactionAspectSupport.java | 26 +++++++++++++++++-- .../reactive/TransactionalOperatorImpl.java | 20 +++++++++++++- ...bstractReactiveTransactionAspectTests.java | 8 ++---- .../reactive/TransactionalOperatorTests.java | 14 +++------- ...bstractCoroutinesTransactionAspectTests.kt | 12 ++------- 6 files changed, 53 insertions(+), 35 deletions(-) diff --git a/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java b/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java index 534f3da13190..870de048e88b 100644 --- a/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java +++ b/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java @@ -264,9 +264,7 @@ void testCommitFails() { .doOnNext(connection -> connection.createStatement("foo")).then() .as(operator::transactional) .as(StepVerifier::create) - .verifyErrorSatisfies(ex -> assertThat(ex) - .isExactlyInstanceOf(RuntimeException.class) - .hasCauseInstanceOf(BadSqlGrammarException.class)); + .verifyError(BadSqlGrammarException.class); verify(connectionMock).isAutoCommit(); verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class)); @@ -319,9 +317,7 @@ void testRollbackFails() { return ConnectionFactoryUtils.getConnection(connectionFactoryMock) .doOnNext(connection -> connection.createStatement("foo")).then(); }).as(StepVerifier::create) - .verifyErrorSatisfies(ex -> assertThat(ex) - .isExactlyInstanceOf(RuntimeException.class) - .hasCauseInstanceOf(BadSqlGrammarException.class)); + .verifyError(BadSqlGrammarException.class); verify(connectionMock).isAutoCommit(); verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class)); diff --git a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java index e23a230cdd70..64c3d3a4fd79 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java +++ b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java @@ -932,7 +932,8 @@ public Object invokeWithinTransaction(Method method, @Nullable Class targetCl }, this::commitTransactionAfterReturning, this::completeTransactionAfterThrowing, - this::rollbackTransactionOnCancel)) + this::rollbackTransactionOnCancel) + .onErrorMap(this::unwrapIfResourceCleanupFailure)) .contextWrite(TransactionContextManager.getOrCreateContext()) .contextWrite(TransactionContextManager.getOrCreateContextHolder()); } @@ -951,7 +952,8 @@ public Object invokeWithinTransaction(Method method, @Nullable Class targetCl }, this::commitTransactionAfterReturning, this::completeTransactionAfterThrowing, - this::rollbackTransactionOnCancel)) + this::rollbackTransactionOnCancel) + .onErrorMap(this::unwrapIfResourceCleanupFailure)) .contextWrite(TransactionContextManager.getOrCreateContext()) .contextWrite(TransactionContextManager.getOrCreateContextHolder())); } @@ -1033,6 +1035,9 @@ private Mono completeTransactionAfterThrowing(@Nullable ReactiveTransactio if (ex2 instanceof TransactionSystemException systemException) { systemException.initApplicationException(ex); } + else { + ex2.addSuppressed(ex); + } return ex2; } ); @@ -1045,6 +1050,9 @@ private Mono completeTransactionAfterThrowing(@Nullable ReactiveTransactio if (ex2 instanceof TransactionSystemException systemException) { systemException.initApplicationException(ex); } + else { + ex2.addSuppressed(ex); + } return ex2; } ); @@ -1052,6 +1060,20 @@ private Mono completeTransactionAfterThrowing(@Nullable ReactiveTransactio } return Mono.empty(); } + + /** + * Unwrap the cause of a throwable, if produced by a failure + * during the async resource cleanup in {@link Flux#usingWhen}. + * @param ex the throwable to try to unwrap + */ + private Throwable unwrapIfResourceCleanupFailure(Throwable ex) { + if (ex instanceof RuntimeException && + ex.getCause() != null && + ex.getMessage().startsWith("Async resource cleanup failed")) { + return ex.getCause(); + } + return ex; + } } diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java index f64bd17508eb..e40057c22d51 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java @@ -79,7 +79,8 @@ public Flux execute(TransactionCallback action) throws TransactionExce action::doInTransaction, this.transactionManager::commit, this::rollbackOnException, - this.transactionManager::rollback)) + this.transactionManager::rollback) + .onErrorMap(this::unwrapIfResourceCleanupFailure)) .contextWrite(TransactionContextManager.getOrCreateContext()) .contextWrite(TransactionContextManager.getOrCreateContextHolder()); } @@ -97,11 +98,28 @@ private Mono rollbackOnException(ReactiveTransaction status, Throwable ex) if (ex2 instanceof TransactionSystemException tse) { tse.initApplicationException(ex); } + else { + ex2.addSuppressed(ex); + } return ex2; } ); } + /** + * Unwrap the cause of a throwable, if produced by a failure + * during the async resource cleanup in {@link Flux#usingWhen}. + * @param ex the throwable to try to unwrap + */ + private Throwable unwrapIfResourceCleanupFailure(Throwable ex) { + if (ex instanceof RuntimeException && + ex.getCause() != null && + ex.getMessage().startsWith("Async resource cleanup failed")) { + return ex.getCause(); + } + return ex; + } + @Override public boolean equals(@Nullable Object other) { diff --git a/spring-tx/src/test/java/org/springframework/transaction/interceptor/AbstractReactiveTransactionAspectTests.java b/spring-tx/src/test/java/org/springframework/transaction/interceptor/AbstractReactiveTransactionAspectTests.java index 42c21cf29402..987dc38c6880 100644 --- a/spring-tx/src/test/java/org/springframework/transaction/interceptor/AbstractReactiveTransactionAspectTests.java +++ b/spring-tx/src/test/java/org/springframework/transaction/interceptor/AbstractReactiveTransactionAspectTests.java @@ -256,9 +256,7 @@ public boolean rollbackOn(Throwable t) { .as(StepVerifier::create) .expectErrorSatisfies(actual -> { if (rollbackException) { - assertThat(actual) - .isExactlyInstanceOf(RuntimeException.class) - .hasCause(tex); + assertThat(actual).isEqualTo(tex); } else { assertThat(actual).isEqualTo(ex); @@ -337,9 +335,7 @@ public void cannotCommitTransaction() throws Exception { Mono.from(itb.setName(name)) .as(StepVerifier::create) - .verifyErrorSatisfies(actual -> assertThat(actual) - .isExactlyInstanceOf(RuntimeException.class) - .hasCause(ex)); + .verifyErrorSatisfies(actual -> assertThat(actual).isEqualTo(ex)); // Should have invoked target and changed name diff --git a/spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java b/spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java index efd07c85df53..1381a90e1525 100644 --- a/spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java +++ b/spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java @@ -119,9 +119,7 @@ public void commitFailureWithMono() { TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition()); Mono.just(true).as(operator::transactional) .as(StepVerifier::create) - .verifyErrorSatisfies(ex -> assertThat(ex) - .isExactlyInstanceOf(RuntimeException.class) - .hasCauseInstanceOf(IOException.class)); + .verifyError(IOException.class); assertThat(commit.subscribeCount()).isEqualTo(1); rollback.assertWasNotSubscribed(); } @@ -140,8 +138,7 @@ public void rollbackFailureWithMono() { Mono.error(actionFailure).as(operator::transactional) .as(StepVerifier::create) .verifyErrorSatisfies(ex -> assertThat(ex) - .isExactlyInstanceOf(RuntimeException.class) - .hasCauseInstanceOf(IOException.class) + .isInstanceOf(IOException.class) .hasSuppressedException(actionFailure)); commit.assertWasNotSubscribed(); assertThat(rollback.subscribeCount()).isEqualTo(1); @@ -181,9 +178,7 @@ public void commitFailureWithFlux() { Flux.just(1, 2, 3, 4).as(operator::transactional) .as(StepVerifier::create) .expectNextCount(4) - .verifyErrorSatisfies(ex -> assertThat(ex) - .isExactlyInstanceOf(RuntimeException.class) - .hasCauseInstanceOf(IOException.class)); + .verifyError(IOException.class); assertThat(commit.subscribeCount()).isEqualTo(1); rollback.assertWasNotSubscribed(); } @@ -203,8 +198,7 @@ public void rollbackFailureWithFlux() { .as(StepVerifier::create) .expectNextCount(3) .verifyErrorSatisfies(ex -> assertThat(ex) - .isExactlyInstanceOf(RuntimeException.class) - .hasCauseInstanceOf(IOException.class) + .isInstanceOf(IOException.class) .hasSuppressedException(actionFailure)); commit.assertWasNotSubscribed(); assertThat(rollback.subscribeCount()).isEqualTo(1); diff --git a/spring-tx/src/test/kotlin/org/springframework/transaction/interceptor/AbstractCoroutinesTransactionAspectTests.kt b/spring-tx/src/test/kotlin/org/springframework/transaction/interceptor/AbstractCoroutinesTransactionAspectTests.kt index d0e97069fb0e..6909ce70e993 100644 --- a/spring-tx/src/test/kotlin/org/springframework/transaction/interceptor/AbstractCoroutinesTransactionAspectTests.kt +++ b/spring-tx/src/test/kotlin/org/springframework/transaction/interceptor/AbstractCoroutinesTransactionAspectTests.kt @@ -218,11 +218,7 @@ abstract class AbstractCoroutinesTransactionAspectTests { } catch (actual: Exception) { if (rollbackException) { - assertThat(actual) - .isExactlyInstanceOf(RuntimeException::class.java) - .cause - .isExactlyInstanceOf(RuntimeException::class.java) - .hasCause(tex) + assertThat(actual).hasMessage(tex.message).isInstanceOf(tex::class.java) } else { assertThat(actual).hasMessage(ex.message).isInstanceOf(ex::class.java) } @@ -295,11 +291,7 @@ abstract class AbstractCoroutinesTransactionAspectTests { itb.setName(name) } catch (actual: Exception) { - assertThat(actual) - .isExactlyInstanceOf(RuntimeException::class.java) - .cause - .isExactlyInstanceOf(RuntimeException::class.java) - .hasCause(ex) + assertThat(actual).isEqualTo(ex) } // Should have invoked target and changed name assertThat(itb.getName()).isEqualTo(name) From 5f5d68f1c3dcc26bcf2ee5876f37db994e003900 Mon Sep 17 00:00:00 2001 From: Enric Sala Date: Thu, 2 Mar 2023 07:53:41 +0100 Subject: [PATCH 4/4] Fix test `CoroutinesTransactionInterceptorTests` --- .../interceptor/AbstractCoroutinesTransactionAspectTests.kt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spring-tx/src/test/kotlin/org/springframework/transaction/interceptor/AbstractCoroutinesTransactionAspectTests.kt b/spring-tx/src/test/kotlin/org/springframework/transaction/interceptor/AbstractCoroutinesTransactionAspectTests.kt index 6909ce70e993..e141188f5bff 100644 --- a/spring-tx/src/test/kotlin/org/springframework/transaction/interceptor/AbstractCoroutinesTransactionAspectTests.kt +++ b/spring-tx/src/test/kotlin/org/springframework/transaction/interceptor/AbstractCoroutinesTransactionAspectTests.kt @@ -291,7 +291,8 @@ abstract class AbstractCoroutinesTransactionAspectTests { itb.setName(name) } catch (actual: Exception) { - assertThat(actual).isEqualTo(ex) + assertThat(actual).isInstanceOf(ex.javaClass) + assertThat(actual).hasMessage(ex.message) } // Should have invoked target and changed name assertThat(itb.getName()).isEqualTo(name)