diff --git a/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/ConnectionFactoryUtils.java b/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/ConnectionFactoryUtils.java index 2effb153b8ee..16dbd073034f 100644 --- a/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/ConnectionFactoryUtils.java +++ b/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/ConnectionFactoryUtils.java @@ -87,7 +87,7 @@ public abstract class ConnectionFactoryUtils { */ public static Mono getConnection(ConnectionFactory connectionFactory) { return doGetConnection(connectionFactory) - .onErrorMap(e -> new DataAccessResourceFailureException("Failed to obtain R2DBC Connection", e)); + .onErrorMap(ex -> new DataAccessResourceFailureException("Failed to obtain R2DBC Connection", ex)); } /** @@ -133,10 +133,10 @@ public static Mono doGetConnection(ConnectionFactory connectionFacto synchronizationManager.bindResource(connectionFactory, holderToUse); } }) // Unexpected exception from external delegation call -> close Connection and rethrow. - .onErrorResume(e -> releaseConnection(connection, connectionFactory).then(Mono.error(e)))); + .onErrorResume(ex -> releaseConnection(connection, connectionFactory).then(Mono.error(ex)))); } return con; - }).onErrorResume(NoTransactionException.class, e -> Mono.from(connectionFactory.create())); + }).onErrorResume(NoTransactionException.class, ex -> Mono.from(connectionFactory.create())); } /** @@ -161,7 +161,7 @@ private static Mono fetchConnection(ConnectionFactory connectionFact */ public static Mono releaseConnection(Connection con, ConnectionFactory connectionFactory) { return doReleaseConnection(con, connectionFactory) - .onErrorMap(e -> new DataAccessResourceFailureException("Failed to close R2DBC Connection", e)); + .onErrorMap(ex -> new DataAccessResourceFailureException("Failed to close R2DBC Connection", ex)); } /** @@ -181,7 +181,7 @@ public static Mono doReleaseConnection(Connection connection, ConnectionFa conHolder.released(); } return Mono.from(connection.close()); - }).onErrorResume(NoTransactionException.class, e -> Mono.from(connection.close())); + }).onErrorResume(NoTransactionException.class, ex -> Mono.from(connection.close())); } /** diff --git a/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java b/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java index 7e4a0e49db40..1143aa1a7f74 100644 --- a/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java +++ b/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java @@ -210,8 +210,7 @@ protected Mono doBegin(TransactionSynchronizationManager synchronizationMa connectionMono = Mono.just(txObject.getConnectionHolder().getConnection()); } - return connectionMono.flatMap(con -> switchAutoCommitIfNecessary(con, transaction) - .then(Mono.from(doBegin(definition, con))) + return connectionMono.flatMap(con -> Mono.from(doBegin(definition, con)) .then(prepareTransactionalConnection(con, definition)) .doOnSuccess(v -> { txObject.getConnectionHolder().setTransactionActive(true); @@ -223,18 +222,15 @@ protected Mono doBegin(TransactionSynchronizationManager synchronizationMa if (txObject.isNewConnectionHolder()) { synchronizationManager.bindResource(obtainConnectionFactory(), txObject.getConnectionHolder()); } - }).thenReturn(con).onErrorResume(e -> { + }).thenReturn(con).onErrorResume(ex -> { if (txObject.isNewConnectionHolder()) { return ConnectionFactoryUtils.releaseConnection(con, obtainConnectionFactory()) .doOnTerminate(() -> txObject.setConnectionHolder(null, false)) - .then(Mono.error(e)); + .then(Mono.error(ex)); } - return Mono.error(e); - })).onErrorResume(e -> { - CannotCreateTransactionException ex = new CannotCreateTransactionException( - "Could not open R2DBC Connection for transaction", e); return Mono.error(ex); - }); + })).onErrorResume(ex -> Mono.error(new CannotCreateTransactionException( + "Could not open R2DBC Connection for transaction", ex))); }).then(); } @@ -356,20 +352,18 @@ protected Mono doCleanupAfterCompletion(TransactionSynchronizationManager Mono afterCleanup = Mono.empty(); - if (txObject.isMustRestoreAutoCommit()) { - Mono restoreAutoCommitStep = safeCleanupStep( - "doCleanupAfterCompletion when restoring autocommit", Mono.from(con.setAutoCommit(true))); - afterCleanup = afterCleanup.then(restoreAutoCommitStep); - } - Mono releaseConnectionStep = Mono.defer(() -> { try { if (txObject.isNewConnectionHolder()) { if (logger.isDebugEnabled()) { logger.debug("Releasing R2DBC Connection [" + con + "] after transaction"); } - return safeCleanupStep("doCleanupAfterCompletion when releasing R2DBC Connection", - ConnectionFactoryUtils.releaseConnection(con, obtainConnectionFactory())); + Mono releaseMono = ConnectionFactoryUtils.releaseConnection(con, obtainConnectionFactory()); + if (logger.isDebugEnabled()) { + releaseMono = releaseMono.doOnError( + ex -> logger.debug(String.format("Error ignored during cleanup: %s", ex))); + } + return releaseMono.onErrorComplete(); } } finally { @@ -381,35 +375,6 @@ protected Mono doCleanupAfterCompletion(TransactionSynchronizationManager }); } - private Mono safeCleanupStep(String stepDescription, Mono stepMono) { - if (!logger.isDebugEnabled()) { - return stepMono.onErrorComplete(); - } - else { - return stepMono.doOnError(e -> - logger.debug(String.format("Error ignored during %s: %s", stepDescription, e))) - .onErrorComplete(); - } - } - - private Mono switchAutoCommitIfNecessary(Connection con, Object transaction) { - ConnectionFactoryTransactionObject txObject = (ConnectionFactoryTransactionObject) transaction; - Mono prepare = Mono.empty(); - - // Switch to manual commit if necessary. This is very expensive in some R2DBC drivers, - // so we don't want to do it unnecessarily (for example if we've explicitly - // configured the connection pool to set it already). - if (con.isAutoCommit()) { - txObject.setMustRestoreAutoCommit(true); - if (logger.isDebugEnabled()) { - logger.debug("Switching R2DBC Connection [" + con + "] to manual commit"); - } - prepare = prepare.then(Mono.from(con.setAutoCommit(false))); - } - - return prepare; - } - /** * Prepare the transactional {@link Connection} right after transaction begin. *

The default implementation executes a "SET TRANSACTION READ ONLY" statement if the @@ -531,8 +496,6 @@ private static class ConnectionFactoryTransactionObject { private boolean newConnectionHolder; - private boolean mustRestoreAutoCommit; - void setConnectionHolder(@Nullable ConnectionHolder connectionHolder, boolean newConnectionHolder) { setConnectionHolder(connectionHolder); this.newConnectionHolder = newConnectionHolder; @@ -558,14 +521,6 @@ public ConnectionHolder getConnectionHolder() { public boolean hasConnectionHolder() { return (this.connectionHolder != null); } - - public void setMustRestoreAutoCommit(boolean mustRestoreAutoCommit) { - this.mustRestoreAutoCommit = mustRestoreAutoCommit; - } - - public boolean isMustRestoreAutoCommit() { - return this.mustRestoreAutoCommit; - } } } diff --git a/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java b/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java index 857744d828b6..b428d8ff7d39 100644 --- a/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java +++ b/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java @@ -23,7 +23,6 @@ import io.r2dbc.spi.ConnectionFactory; import io.r2dbc.spi.IsolationLevel; import io.r2dbc.spi.R2dbcBadGrammarException; -import io.r2dbc.spi.R2dbcTimeoutException; import io.r2dbc.spi.Statement; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -56,6 +55,7 @@ * Unit tests for {@link R2dbcTransactionManager}. * * @author Mark Paluch + * @author Juergen Hoeller */ class R2dbcTransactionManagerUnitTests { @@ -67,7 +67,7 @@ class R2dbcTransactionManagerUnitTests { @BeforeEach - @SuppressWarnings({ "unchecked", "rawtypes" }) + @SuppressWarnings({"unchecked", "rawtypes"}) void before() { when(connectionFactoryMock.create()).thenReturn((Mono) Mono.just(connectionMock)); when(connectionMock.beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class))).thenReturn(Mono.empty()); @@ -96,7 +96,6 @@ void testSimpleTransaction() { .verifyComplete(); assertThat(commits).hasValue(1); - verify(connectionMock).isAutoCommit(); verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class)); verify(connectionMock).commitTransaction(); verify(connectionMock).close(); @@ -185,7 +184,6 @@ void doesNotSetIsolationLevelIfMatch() { @Test void doesNotSetAutoCommitDisabled() { - when(connectionMock.isAutoCommit()).thenReturn(false); when(connectionMock.commitTransaction()).thenReturn(Mono.empty()); DefaultTransactionDefinition definition = new DefaultTransactionDefinition(); @@ -203,29 +201,6 @@ void doesNotSetAutoCommitDisabled() { verify(connectionMock).commitTransaction(); } - @Test - void restoresAutoCommit() { - when(connectionMock.isAutoCommit()).thenReturn(true); - when(connectionMock.setAutoCommit(anyBoolean())).thenReturn(Mono.empty()); - when(connectionMock.commitTransaction()).thenReturn(Mono.empty()); - - DefaultTransactionDefinition definition = new DefaultTransactionDefinition(); - - TransactionalOperator operator = TransactionalOperator.create(tm, definition); - - ConnectionFactoryUtils.getConnection(connectionFactoryMock).as( - operator::transactional) - .as(StepVerifier::create) - .expectNextCount(1) - .verifyComplete(); - - verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class)); - verify(connectionMock).setAutoCommit(false); - verify(connectionMock).setAutoCommit(true); - verify(connectionMock).commitTransaction(); - verify(connectionMock).close(); - } - @Test void appliesReadOnly() { when(connectionMock.commitTransaction()).thenReturn(Mono.empty()); @@ -246,7 +221,6 @@ void appliesReadOnly() { .expectNextCount(1) .verifyComplete(); - verify(connectionMock).isAutoCommit(); verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class)); verify(connectionMock).createStatement("SET TRANSACTION READ ONLY"); verify(connectionMock).commitTransaction(); @@ -268,7 +242,6 @@ void testCommitFails() { .as(StepVerifier::create) .verifyError(BadSqlGrammarException.class); - verify(connectionMock).isAutoCommit(); verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class)); verify(connectionMock).createStatement("foo"); verify(connectionMock).commitTransaction(); @@ -299,7 +272,6 @@ void testRollback() { assertThat(commits).hasValue(0); assertThat(rollbacks).hasValue(1); - verify(connectionMock).isAutoCommit(); verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class)); verify(connectionMock).rollbackTransaction(); verify(connectionMock).close(); @@ -322,7 +294,6 @@ void testRollbackFails() { }).as(StepVerifier::create) .verifyError(BadSqlGrammarException.class); - verify(connectionMock).isAutoCommit(); verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class)); verify(connectionMock).createStatement("foo"); verify(connectionMock, never()).commitTransaction(); @@ -338,10 +309,7 @@ void testConnectionReleasedWhenRollbackFails() { TransactionalOperator operator = TransactionalOperator.create(tm); - when(connectionMock.isAutoCommit()).thenReturn(true); - when(connectionMock.setAutoCommit(true)).thenReturn(Mono.defer(() -> Mono.error(new R2dbcTimeoutException("SET AUTOCOMMIT = 1 timed out")))); when(connectionMock.setTransactionIsolationLevel(any())).thenReturn(Mono.empty()); - when(connectionMock.setAutoCommit(false)).thenReturn(Mono.empty()); operator.execute(reactiveTransaction -> ConnectionFactoryUtils.getConnection(connectionFactoryMock) .doOnNext(connection -> { @@ -352,7 +320,6 @@ void testConnectionReleasedWhenRollbackFails() { .hasCause(new R2dbcBadGrammarException("Rollback should fail")) ); - verify(connectionMock).isAutoCommit(); verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class)); verify(connectionMock, never()).commitTransaction(); verify(connectionMock).rollbackTransaction(); @@ -380,7 +347,6 @@ void testTransactionSetRollbackOnly() { }).as(StepVerifier::create) .verifyComplete(); - verify(connectionMock).isAutoCommit(); verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class)); verify(connectionMock).rollbackTransaction(); verify(connectionMock).close();