Skip to content

Commit

Permalink
Avoid unnecessary auto-commit check for proper transaction begin
Browse files Browse the repository at this point in the history
Closes gh-30508
  • Loading branch information
jhoeller committed Jun 3, 2023
1 parent d290625 commit 9751987
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public abstract class ConnectionFactoryUtils {
*/
public static Mono<Connection> getConnection(ConnectionFactory connectionFactory) {
return doGetConnection(connectionFactory)
.onErrorMap(e -> new DataAccessResourceFailureException("Failed to obtain R2DBC Connection", e));
.onErrorMap(ex -> new DataAccessResourceFailureException("Failed to obtain R2DBC Connection", ex));
}

/**
Expand Down Expand Up @@ -133,10 +133,10 @@ public static Mono<Connection> doGetConnection(ConnectionFactory connectionFacto
synchronizationManager.bindResource(connectionFactory, holderToUse);
}
}) // Unexpected exception from external delegation call -> close Connection and rethrow.
.onErrorResume(e -> releaseConnection(connection, connectionFactory).then(Mono.error(e))));
.onErrorResume(ex -> releaseConnection(connection, connectionFactory).then(Mono.error(ex))));
}
return con;
}).onErrorResume(NoTransactionException.class, e -> Mono.from(connectionFactory.create()));
}).onErrorResume(NoTransactionException.class, ex -> Mono.from(connectionFactory.create()));
}

/**
Expand All @@ -161,7 +161,7 @@ private static Mono<Connection> fetchConnection(ConnectionFactory connectionFact
*/
public static Mono<Void> releaseConnection(Connection con, ConnectionFactory connectionFactory) {
return doReleaseConnection(con, connectionFactory)
.onErrorMap(e -> new DataAccessResourceFailureException("Failed to close R2DBC Connection", e));
.onErrorMap(ex -> new DataAccessResourceFailureException("Failed to close R2DBC Connection", ex));
}

/**
Expand All @@ -181,7 +181,7 @@ public static Mono<Void> doReleaseConnection(Connection connection, ConnectionFa
conHolder.released();
}
return Mono.from(connection.close());
}).onErrorResume(NoTransactionException.class, e -> Mono.from(connection.close()));
}).onErrorResume(NoTransactionException.class, ex -> Mono.from(connection.close()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,7 @@ protected Mono<Void> doBegin(TransactionSynchronizationManager synchronizationMa
connectionMono = Mono.just(txObject.getConnectionHolder().getConnection());
}

return connectionMono.flatMap(con -> switchAutoCommitIfNecessary(con, transaction)
.then(Mono.from(doBegin(definition, con)))
return connectionMono.flatMap(con -> Mono.from(doBegin(definition, con))
.then(prepareTransactionalConnection(con, definition))
.doOnSuccess(v -> {
txObject.getConnectionHolder().setTransactionActive(true);
Expand All @@ -223,18 +222,15 @@ protected Mono<Void> doBegin(TransactionSynchronizationManager synchronizationMa
if (txObject.isNewConnectionHolder()) {
synchronizationManager.bindResource(obtainConnectionFactory(), txObject.getConnectionHolder());
}
}).thenReturn(con).onErrorResume(e -> {
}).thenReturn(con).onErrorResume(ex -> {
if (txObject.isNewConnectionHolder()) {
return ConnectionFactoryUtils.releaseConnection(con, obtainConnectionFactory())
.doOnTerminate(() -> txObject.setConnectionHolder(null, false))
.then(Mono.error(e));
.then(Mono.error(ex));
}
return Mono.error(e);
})).onErrorResume(e -> {
CannotCreateTransactionException ex = new CannotCreateTransactionException(
"Could not open R2DBC Connection for transaction", e);
return Mono.error(ex);
});
})).onErrorResume(ex -> Mono.error(new CannotCreateTransactionException(
"Could not open R2DBC Connection for transaction", ex)));
}).then();
}

Expand Down Expand Up @@ -356,20 +352,18 @@ protected Mono<Void> doCleanupAfterCompletion(TransactionSynchronizationManager

Mono<Void> afterCleanup = Mono.empty();

if (txObject.isMustRestoreAutoCommit()) {
Mono<Void> restoreAutoCommitStep = safeCleanupStep(
"doCleanupAfterCompletion when restoring autocommit", Mono.from(con.setAutoCommit(true)));
afterCleanup = afterCleanup.then(restoreAutoCommitStep);
}

Mono<Void> releaseConnectionStep = Mono.defer(() -> {
try {
if (txObject.isNewConnectionHolder()) {
if (logger.isDebugEnabled()) {
logger.debug("Releasing R2DBC Connection [" + con + "] after transaction");
}
return safeCleanupStep("doCleanupAfterCompletion when releasing R2DBC Connection",
ConnectionFactoryUtils.releaseConnection(con, obtainConnectionFactory()));
Mono<Void> releaseMono = ConnectionFactoryUtils.releaseConnection(con, obtainConnectionFactory());
if (logger.isDebugEnabled()) {
releaseMono = releaseMono.doOnError(
ex -> logger.debug(String.format("Error ignored during cleanup: %s", ex)));
}
return releaseMono.onErrorComplete();
}
}
finally {
Expand All @@ -381,35 +375,6 @@ protected Mono<Void> doCleanupAfterCompletion(TransactionSynchronizationManager
});
}

private Mono<Void> safeCleanupStep(String stepDescription, Mono<Void> stepMono) {
if (!logger.isDebugEnabled()) {
return stepMono.onErrorComplete();
}
else {
return stepMono.doOnError(e ->
logger.debug(String.format("Error ignored during %s: %s", stepDescription, e)))
.onErrorComplete();
}
}

private Mono<Void> switchAutoCommitIfNecessary(Connection con, Object transaction) {
ConnectionFactoryTransactionObject txObject = (ConnectionFactoryTransactionObject) transaction;
Mono<Void> prepare = Mono.empty();

// Switch to manual commit if necessary. This is very expensive in some R2DBC drivers,
// so we don't want to do it unnecessarily (for example if we've explicitly
// configured the connection pool to set it already).
if (con.isAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching R2DBC Connection [" + con + "] to manual commit");
}
prepare = prepare.then(Mono.from(con.setAutoCommit(false)));
}

return prepare;
}

/**
* Prepare the transactional {@link Connection} right after transaction begin.
* <p>The default implementation executes a "SET TRANSACTION READ ONLY" statement if the
Expand Down Expand Up @@ -531,8 +496,6 @@ private static class ConnectionFactoryTransactionObject {

private boolean newConnectionHolder;

private boolean mustRestoreAutoCommit;

void setConnectionHolder(@Nullable ConnectionHolder connectionHolder, boolean newConnectionHolder) {
setConnectionHolder(connectionHolder);
this.newConnectionHolder = newConnectionHolder;
Expand All @@ -558,14 +521,6 @@ public ConnectionHolder getConnectionHolder() {
public boolean hasConnectionHolder() {
return (this.connectionHolder != null);
}

public void setMustRestoreAutoCommit(boolean mustRestoreAutoCommit) {
this.mustRestoreAutoCommit = mustRestoreAutoCommit;
}

public boolean isMustRestoreAutoCommit() {
return this.mustRestoreAutoCommit;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.IsolationLevel;
import io.r2dbc.spi.R2dbcBadGrammarException;
import io.r2dbc.spi.R2dbcTimeoutException;
import io.r2dbc.spi.Statement;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -56,6 +55,7 @@
* Unit tests for {@link R2dbcTransactionManager}.
*
* @author Mark Paluch
* @author Juergen Hoeller
*/
class R2dbcTransactionManagerUnitTests {

Expand All @@ -67,7 +67,7 @@ class R2dbcTransactionManagerUnitTests {


@BeforeEach
@SuppressWarnings({ "unchecked", "rawtypes" })
@SuppressWarnings({"unchecked", "rawtypes"})
void before() {
when(connectionFactoryMock.create()).thenReturn((Mono) Mono.just(connectionMock));
when(connectionMock.beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class))).thenReturn(Mono.empty());
Expand Down Expand Up @@ -96,7 +96,6 @@ void testSimpleTransaction() {
.verifyComplete();

assertThat(commits).hasValue(1);
verify(connectionMock).isAutoCommit();
verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class));
verify(connectionMock).commitTransaction();
verify(connectionMock).close();
Expand Down Expand Up @@ -185,7 +184,6 @@ void doesNotSetIsolationLevelIfMatch() {

@Test
void doesNotSetAutoCommitDisabled() {
when(connectionMock.isAutoCommit()).thenReturn(false);
when(connectionMock.commitTransaction()).thenReturn(Mono.empty());

DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
Expand All @@ -203,29 +201,6 @@ void doesNotSetAutoCommitDisabled() {
verify(connectionMock).commitTransaction();
}

@Test
void restoresAutoCommit() {
when(connectionMock.isAutoCommit()).thenReturn(true);
when(connectionMock.setAutoCommit(anyBoolean())).thenReturn(Mono.empty());
when(connectionMock.commitTransaction()).thenReturn(Mono.empty());

DefaultTransactionDefinition definition = new DefaultTransactionDefinition();

TransactionalOperator operator = TransactionalOperator.create(tm, definition);

ConnectionFactoryUtils.getConnection(connectionFactoryMock).as(
operator::transactional)
.as(StepVerifier::create)
.expectNextCount(1)
.verifyComplete();

verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class));
verify(connectionMock).setAutoCommit(false);
verify(connectionMock).setAutoCommit(true);
verify(connectionMock).commitTransaction();
verify(connectionMock).close();
}

@Test
void appliesReadOnly() {
when(connectionMock.commitTransaction()).thenReturn(Mono.empty());
Expand All @@ -246,7 +221,6 @@ void appliesReadOnly() {
.expectNextCount(1)
.verifyComplete();

verify(connectionMock).isAutoCommit();
verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class));
verify(connectionMock).createStatement("SET TRANSACTION READ ONLY");
verify(connectionMock).commitTransaction();
Expand All @@ -268,7 +242,6 @@ void testCommitFails() {
.as(StepVerifier::create)
.verifyError(BadSqlGrammarException.class);

verify(connectionMock).isAutoCommit();
verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class));
verify(connectionMock).createStatement("foo");
verify(connectionMock).commitTransaction();
Expand Down Expand Up @@ -299,7 +272,6 @@ void testRollback() {

assertThat(commits).hasValue(0);
assertThat(rollbacks).hasValue(1);
verify(connectionMock).isAutoCommit();
verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class));
verify(connectionMock).rollbackTransaction();
verify(connectionMock).close();
Expand All @@ -322,7 +294,6 @@ void testRollbackFails() {
}).as(StepVerifier::create)
.verifyError(BadSqlGrammarException.class);

verify(connectionMock).isAutoCommit();
verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class));
verify(connectionMock).createStatement("foo");
verify(connectionMock, never()).commitTransaction();
Expand All @@ -338,10 +309,7 @@ void testConnectionReleasedWhenRollbackFails() {

TransactionalOperator operator = TransactionalOperator.create(tm);

when(connectionMock.isAutoCommit()).thenReturn(true);
when(connectionMock.setAutoCommit(true)).thenReturn(Mono.defer(() -> Mono.error(new R2dbcTimeoutException("SET AUTOCOMMIT = 1 timed out"))));
when(connectionMock.setTransactionIsolationLevel(any())).thenReturn(Mono.empty());
when(connectionMock.setAutoCommit(false)).thenReturn(Mono.empty());

operator.execute(reactiveTransaction -> ConnectionFactoryUtils.getConnection(connectionFactoryMock)
.doOnNext(connection -> {
Expand All @@ -352,7 +320,6 @@ void testConnectionReleasedWhenRollbackFails() {
.hasCause(new R2dbcBadGrammarException("Rollback should fail"))
);

verify(connectionMock).isAutoCommit();
verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class));
verify(connectionMock, never()).commitTransaction();
verify(connectionMock).rollbackTransaction();
Expand Down Expand Up @@ -380,7 +347,6 @@ void testTransactionSetRollbackOnly() {
}).as(StepVerifier::create)
.verifyComplete();

verify(connectionMock).isAutoCommit();
verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class));
verify(connectionMock).rollbackTransaction();
verify(connectionMock).close();
Expand Down

0 comments on commit 9751987

Please sign in to comment.