diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlTransactionDefinition.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlTransactionDefinition.java index fab2bcc15..4abc58e36 100644 --- a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlTransactionDefinition.java +++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlTransactionDefinition.java @@ -37,35 +37,34 @@ public final class MySqlTransactionDefinition implements TransactionDefinition { /** - * Use {@code WITH CONSISTENT SNAPSHOT} syntax, all MySQL-compatible servers should support this syntax. + * Use {@code WITH CONSISTENT SNAPSHOT} property. + *

* The option starts a consistent read for storage engines such as InnoDB and XtraDB that can do so, the * same as if a {@code START TRANSACTION} followed by a {@code SELECT ...} from any InnoDB table was * issued. - *

- * NOTICE: This option and {@link #READ_ONLY} cannot be enabled at the same definition. */ public static final Option WITH_CONSISTENT_SNAPSHOT = Option.valueOf("withConsistentSnapshot"); /** - * Use {@code START TRANSACTION WITH CONSISTENT [engine] SNAPSHOT} for Facebook/MySQL or similar syntax. - * Only available when {@link #WITH_CONSISTENT_SNAPSHOT} is set to {@code true}. + * Use {@code WITH CONSISTENT [engine] SNAPSHOT} for Facebook/MySQL or similar property. Only available + * when {@link #WITH_CONSISTENT_SNAPSHOT} is set to {@code true}. *

- * NOTICE: This is an extended syntax for special servers. Before using it, check whether the server - * supports the syntax. + * Note: This is an extended syntax based on specific distributions. Please check whether the server + * supports this property before using it. */ public static final Option CONSISTENT_SNAPSHOT_ENGINE = Option.valueOf("consistentSnapshotEngine"); /** - * Use {@code START TRANSACTION WITH CONSISTENT SNAPSHOT FROM SESSION [session_id]} for Percona/MySQL or - * similar syntax. Only available when {@link #WITH_CONSISTENT_SNAPSHOT} is set to {@code true}. + * Use {@code WITH CONSISTENT SNAPSHOT FROM SESSION [session_id]} for Percona/MySQL or similar property. + * Only available when {@link #WITH_CONSISTENT_SNAPSHOT} is set to {@code true}. *

- * The {@code session_id} is the session identifier reported in the {@code Id} column of the process list. - * Reported by {@code SHOW COLUMNS FROM performance_schema.processlist}, it should be an unsigned 64-bit - * integer. Use {@code SHOW PROCESSLIST} to find session identifier of the process list. + * The {@code session_id} is received by {@code SHOW COLUMNS FROM performance_schema.processlist}, it + * should be an unsigned 64-bit integer. Use {@code SHOW PROCESSLIST} to find session identifier of the + * process list. *

- * NOTICE: This is an extended syntax for special servers. Before using it, check whether the server - * supports the syntax. + * Note: This is an extended syntax based on specific distributions. Please check whether the server + * supports this property before using it. */ public static final Option CONSISTENT_SNAPSHOT_FROM_SESSION = Option.valueOf("consistentSnapshotFromSession"); diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/QueryFlow.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/QueryFlow.java index cec6a9876..afb23fb55 100644 --- a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/QueryFlow.java +++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/QueryFlow.java @@ -1281,20 +1281,28 @@ protected boolean process(int task, SynchronousSink sink) { return false; } - private static String buildStartTransaction(TransactionDefinition definition) { + /** + * Visible for testing. + * + * @param definition the transaction definition + * @return the {@code START TRANSACTION} statement + */ + static String buildStartTransaction(TransactionDefinition definition) { Boolean readOnly = definition.getAttribute(TransactionDefinition.READ_ONLY); Boolean snapshot = definition.getAttribute(MySqlTransactionDefinition.WITH_CONSISTENT_SNAPSHOT); - if (readOnly == null && (snapshot == null || !snapshot)) { + if (readOnly == null && !Boolean.TRUE.equals(snapshot)) { return "BEGIN"; } StringBuilder builder = new StringBuilder(90).append("START TRANSACTION"); + boolean first = true; - if (snapshot != null && snapshot) { + if (Boolean.TRUE.equals(snapshot)) { ConsistentSnapshotEngine engine = definition.getAttribute(MySqlTransactionDefinition.CONSISTENT_SNAPSHOT_ENGINE); + first = false; builder.append(" WITH CONSISTENT "); if (engine == null) { @@ -1312,6 +1320,10 @@ private static String buildStartTransaction(TransactionDefinition definition) { } if (readOnly != null) { + if (!first) { + builder.append(','); + } + if (readOnly) { builder.append(" READ ONLY"); } else { diff --git a/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java b/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java index 48d63c0ed..50c4d5122 100644 --- a/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java +++ b/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java @@ -18,7 +18,9 @@ import io.r2dbc.spi.ColumnMetadata; import io.r2dbc.spi.R2dbcPermissionDeniedException; +import io.r2dbc.spi.TransactionDefinition; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledIf; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.testcontainers.shaded.com.fasterxml.jackson.core.JsonProcessingException; @@ -71,6 +73,30 @@ void isInTransaction() { .doOnSuccess(ignored -> assertThat(connection.isInTransaction()).isFalse())); } + @DisabledIf("envIsLessThanMySql56") + @Test + void startTransaction() { + TransactionDefinition readOnlyConsistent = MySqlTransactionDefinition.builder() + .withConsistentSnapshot(true) + .readOnly(true) + .build(); + TransactionDefinition readWriteConsistent = MySqlTransactionDefinition.builder() + .withConsistentSnapshot(true) + .readOnly(false) + .build(); + + complete(connection -> Mono.fromRunnable(() -> assertThat(connection.isInTransaction()) + .isFalse()) + .then(connection.beginTransaction(readOnlyConsistent)) + .doOnSuccess(ignored -> assertThat(connection.isInTransaction()).isTrue()) + .then(connection.rollbackTransaction()) + .doOnSuccess(ignored -> assertThat(connection.isInTransaction()).isFalse()) + .then(connection.beginTransaction(readWriteConsistent)) + .doOnSuccess(ignored -> assertThat(connection.isInTransaction()).isTrue()) + .then(connection.rollbackTransaction()) + .doOnSuccess(ignored -> assertThat(connection.isInTransaction()).isFalse())); + } + @Test void autoRollbackPreRelease() { // Mock pool allocate/release. diff --git a/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/StartTransactionStateTest.java b/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/StartTransactionStateTest.java new file mode 100644 index 000000000..14a878d91 --- /dev/null +++ b/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/StartTransactionStateTest.java @@ -0,0 +1,71 @@ +/* + * Copyright 2024 asyncer.io projects + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.asyncer.r2dbc.mysql; + +import io.r2dbc.spi.IsolationLevel; +import io.r2dbc.spi.TransactionDefinition; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Unit tests for {@link StartTransactionState}. + */ +class StartTransactionStateTest { + + @ParameterizedTest + @MethodSource + void buildStartTransaction(TransactionDefinition definition, String excepted) { + assertThat(StartTransactionState.buildStartTransaction(definition)).isEqualTo(excepted); + } + + static Stream buildStartTransaction() { + return Stream.of( + Arguments.of(MySqlTransactionDefinition.empty(), "BEGIN"), + Arguments.of(MySqlTransactionDefinition.builder() + .isolationLevel(IsolationLevel.READ_UNCOMMITTED) + .build(), "BEGIN"), + Arguments.of(MySqlTransactionDefinition.builder() + .readOnly(true) + .build(), "START TRANSACTION READ ONLY"), + Arguments.of(MySqlTransactionDefinition.builder() + .readOnly(false) + .build(), "START TRANSACTION READ WRITE"), + Arguments.of(MySqlTransactionDefinition.builder() + .withConsistentSnapshot(true) + .build(), "START TRANSACTION WITH CONSISTENT SNAPSHOT"), + Arguments.of(MySqlTransactionDefinition.builder() + .withConsistentSnapshot(true) + .readOnly(true) + .build(), "START TRANSACTION WITH CONSISTENT SNAPSHOT, READ ONLY"), + Arguments.of(MySqlTransactionDefinition.builder() + .withConsistentSnapshot(true) + .readOnly(false) + .build(), "START TRANSACTION WITH CONSISTENT SNAPSHOT, READ WRITE"), + Arguments.of(MySqlTransactionDefinition.builder() + .withConsistentSnapshot(true) + .consistentSnapshotEngine(ConsistentSnapshotEngine.ROCKSDB) + .consistentSnapshotFromSession(3L) + .readOnly(true) + .build(), "START TRANSACTION WITH CONSISTENT ROCKSDB SNAPSHOT FROM SESSION 3, READ ONLY") + ); + } +}