diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/ConnectionStrategy.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/ConnectionStrategy.java
new file mode 100644
index 000000000..d90acd054
--- /dev/null
+++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/ConnectionStrategy.java
@@ -0,0 +1,218 @@
+/*
+ * Copyright 2024 asyncer.io projects
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.asyncer.r2dbc.mysql;
+
+import io.asyncer.r2dbc.mysql.client.Client;
+import io.asyncer.r2dbc.mysql.client.ReactorNettyClient;
+import io.asyncer.r2dbc.mysql.internal.util.StringUtils;
+import io.netty.channel.ChannelOption;
+import io.netty.resolver.AddressResolver;
+import io.netty.resolver.AddressResolverGroup;
+import io.netty.resolver.DefaultNameResolver;
+import io.netty.resolver.RoundRobinInetAddressResolver;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.internal.logging.InternalLogger;
+import io.netty.util.internal.logging.InternalLoggerFactory;
+import io.r2dbc.spi.R2dbcNonTransientResourceException;
+import org.jetbrains.annotations.Nullable;
+import reactor.core.publisher.Mono;
+import reactor.netty.resources.LoopResources;
+import reactor.netty.tcp.TcpClient;
+
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.time.ZoneId;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * An interface of a connection strategy that considers how to obtain a MySQL {@link Client} object.
+ *
+ * @since 1.2.0
+ */
+@FunctionalInterface
+interface ConnectionStrategy {
+
+ InternalLogger logger = InternalLoggerFactory.getInstance(ConnectionStrategy.class);
+
+ /**
+ * Establish a connection to a target server that is determined by this connection strategy.
+ *
+ * @return a logged-in {@link Client} object.
+ */
+ Mono extends Client> connect();
+
+ /**
+ * Creates a general-purpose {@link TcpClient} with the given {@link SocketClientConfiguration}.
+ *
+ * Note: Unix Domain Socket also uses this method to create a general-purpose {@link TcpClient client}.
+ *
+ * @param configuration socket client configuration.
+ * @return a general-purpose {@link TcpClient client}.
+ */
+ static TcpClient createTcpClient(SocketClientConfiguration configuration, boolean balancedDns) {
+ LoopResources loopResources = configuration.getLoopResources();
+ Duration connectTimeout = configuration.getConnectTimeout();
+ TcpClient client = TcpClient.newConnection();
+
+ if (loopResources != null) {
+ client = client.runOn(loopResources);
+ }
+
+ if (connectTimeout != null) {
+ client = client.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(connectTimeout.toMillis()));
+ }
+
+ if (balancedDns) {
+ client = client.resolver(BalancedResolverGroup.INSTANCE);
+ }
+
+ return client;
+ }
+
+ /**
+ * Logins to a MySQL server with the given {@link TcpClient}, {@link Credential} and configurations.
+ *
+ * @param tcpClient a TCP client to connect to a MySQL server.
+ * @param credential user and password to log in to a MySQL server.
+ * @param configuration a configuration that affects login behavior.
+ * @return a logged-in {@link Client} object.
+ */
+ static Mono connectWithInit(
+ TcpClient tcpClient,
+ Credential credential,
+ MySqlConnectionConfiguration configuration
+ ) {
+ return Mono.fromSupplier(() -> {
+ String timeZone = configuration.getConnectionTimeZone();
+ ZoneId connectionTimeZone;
+ if ("LOCAL".equalsIgnoreCase(timeZone)) {
+ connectionTimeZone = ZoneId.systemDefault().normalized();
+ } else if ("SERVER".equalsIgnoreCase(timeZone)) {
+ connectionTimeZone = null;
+ } else {
+ connectionTimeZone = StringUtils.parseZoneId(timeZone);
+ }
+
+ return new ConnectionContext(
+ configuration.getZeroDateOption(),
+ configuration.getLoadLocalInfilePath(),
+ configuration.getLocalInfileBufferSize(),
+ configuration.isPreserveInstants(),
+ connectionTimeZone
+ );
+ }).flatMap(ctx -> ReactorNettyClient.connect(tcpClient, configuration.getSsl(), ctx)).flatMap(client -> {
+ // Lazy init database after handshake/login
+ MySqlSslConfiguration ssl = configuration.getSsl();
+ String loginDb = configuration.isCreateDatabaseIfNotExist() ? "" : configuration.getDatabase();
+
+ return InitFlow.initHandshake(
+ client,
+ ssl.getSslMode(),
+ loginDb,
+ credential.getUser(),
+ credential.getPassword(),
+ configuration.getCompressionAlgorithms(),
+ configuration.getZstdCompressionLevel()
+ ).then(Mono.just(client)).onErrorResume(e -> client.forceClose().then(Mono.error(e)));
+ });
+ }
+
+ /**
+ * Creates an exception that indicates a retry failure.
+ *
+ * @param message the message of the exception.
+ * @param cause the last exception that caused the retry.
+ * @return a retry failure exception.
+ */
+ static R2dbcNonTransientResourceException retryFail(String message, @Nullable Throwable cause) {
+ return new R2dbcNonTransientResourceException(
+ message,
+ "H1000",
+ 9000,
+ cause
+ );
+ }
+
+ /**
+ * Connect and login to a MySQL server with a specific TCP socket address.
+ *
+ * @since 1.2.0
+ */
+ final class InetConnectFunction implements Function, Mono> {
+
+ private final boolean balancedDns;
+
+ private final boolean tcpKeepAlive;
+
+ private final boolean tcpNoDelay;
+
+ private final Credential credential;
+
+ private final MySqlConnectionConfiguration configuration;
+
+ InetConnectFunction(
+ boolean balancedDns,
+ boolean tcpKeepAlive,
+ boolean tcpNoDelay,
+ Credential credential,
+ MySqlConnectionConfiguration configuration
+ ) {
+ this.balancedDns = balancedDns;
+ this.tcpKeepAlive = tcpKeepAlive;
+ this.tcpNoDelay = tcpNoDelay;
+ this.credential = credential;
+ this.configuration = configuration;
+ }
+
+ @Override
+ public Mono apply(Supplier address) {
+ TcpClient client = ConnectionStrategy.createTcpClient(configuration.getClient(), balancedDns)
+ .option(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
+ .option(ChannelOption.TCP_NODELAY, tcpNoDelay)
+ .remoteAddress(address);
+
+ return ConnectionStrategy.connectWithInit(client, credential, configuration);
+ }
+ }
+
+ /**
+ * Resolves the {@link InetSocketAddress} to IP address, randomly pick one if it resolves to multiple IP addresses.
+ *
+ * @since 1.2.0
+ */
+ final class BalancedResolverGroup extends AddressResolverGroup {
+
+ BalancedResolverGroup() {
+ }
+
+ public static final BalancedResolverGroup INSTANCE;
+
+ static {
+ INSTANCE = new BalancedResolverGroup();
+ Runtime.getRuntime().addShutdownHook(new Thread(
+ INSTANCE::close,
+ "R2DBC-MySQL-BalancedResolverGroup-ShutdownHook"
+ ));
+ }
+
+ @Override
+ protected AddressResolver newResolver(EventExecutor executor) {
+ return new RoundRobinInetAddressResolver(executor, new DefaultNameResolver(executor)).asAddressResolver();
+ }
+ }
+}
diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/Credential.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/Credential.java
new file mode 100644
index 000000000..82cb1168d
--- /dev/null
+++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/Credential.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2024 asyncer.io projects
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.asyncer.r2dbc.mysql;
+
+import org.jetbrains.annotations.Nullable;
+
+import java.util.Objects;
+
+/**
+ * A value object representing a user with an optional password.
+ */
+final class Credential {
+
+ private final String user;
+
+ @Nullable
+ private final CharSequence password;
+
+ Credential(String user, @Nullable CharSequence password) {
+ this.user = user;
+ this.password = password;
+ }
+
+ String getUser() {
+ return user;
+ }
+
+ @Nullable
+ CharSequence getPassword() {
+ return password;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof Credential)) {
+ return false;
+ }
+
+ Credential that = (Credential) o;
+
+ return user.equals(that.user) && Objects.equals(password, that.password);
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * user.hashCode() + Objects.hashCode(password);
+ }
+
+ @Override
+ public String toString() {
+ return "Credential{user=" + user + ", password=REDACTED}";
+ }
+}
diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/InitFlow.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/InitFlow.java
index a7c13c596..8e1b06f1f 100644
--- a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/InitFlow.java
+++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/InitFlow.java
@@ -22,6 +22,7 @@
import io.asyncer.r2dbc.mysql.cache.PrepareCache;
import io.asyncer.r2dbc.mysql.client.Client;
import io.asyncer.r2dbc.mysql.client.FluxExchangeable;
+import io.asyncer.r2dbc.mysql.client.ReactorNettyClient;
import io.asyncer.r2dbc.mysql.codec.Codecs;
import io.asyncer.r2dbc.mysql.codec.CodecsBuilder;
import io.asyncer.r2dbc.mysql.constant.CompressionAlgorithm;
@@ -75,7 +76,7 @@
import java.util.function.Function;
/**
- * A message flow utility that can initializes the session of {@link Client}.
+ * A message flow utility that can initializes the session of {@link ReactorNettyClient}.
*
* It should not use server-side prepared statements, because {@link PrepareCache} will be initialized after the session
* is initialized.
@@ -117,9 +118,9 @@ final class InitFlow {
};
/**
- * Initializes handshake and login a {@link Client}.
+ * Initializes handshake and login a {@link ReactorNettyClient}.
*
- * @param client the {@link Client} to exchange messages with.
+ * @param client the {@link ReactorNettyClient} to exchange messages with.
* @param sslMode the {@link SslMode} defines SSL capability and behavior.
* @param database the database that will be connected.
* @param user the user that will be login.
@@ -128,7 +129,7 @@ final class InitFlow {
* @param zstdCompressionLevel the zstd compression level.
* @return a {@link Mono} that indicates the initialization is done, or an error if the initialization failed.
*/
- static Mono initHandshake(Client client, SslMode sslMode, String database, String user,
+ static Mono initHandshake(ReactorNettyClient client, SslMode sslMode, String database, String user,
@Nullable CharSequence password, Set compressionAlgorithms, int zstdCompressionLevel) {
return client.exchange(new HandshakeExchangeable(
client,
@@ -488,7 +489,7 @@ final class HandshakeExchangeable extends FluxExchangeable {
private final Sinks.Many requests = Sinks.many().unicast()
.onBackpressureBuffer(Queues.one().get());
- private final Client client;
+ private final ReactorNettyClient client;
private final SslMode sslMode;
@@ -511,7 +512,7 @@ final class HandshakeExchangeable extends FluxExchangeable {
private boolean sslCompleted;
- HandshakeExchangeable(Client client, SslMode sslMode, String database, String user,
+ HandshakeExchangeable(ReactorNettyClient client, SslMode sslMode, String database, String user,
@Nullable CharSequence password, Set compressions,
int zstdCompressionLevel) {
this.client = client;
diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MultiHostsConnectionStrategy.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MultiHostsConnectionStrategy.java
new file mode 100644
index 000000000..fc7071505
--- /dev/null
+++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MultiHostsConnectionStrategy.java
@@ -0,0 +1,252 @@
+/*
+ * Copyright 2024 asyncer.io projects
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.asyncer.r2dbc.mysql;
+
+import io.asyncer.r2dbc.mysql.client.Client;
+import io.asyncer.r2dbc.mysql.client.FailoverClient;
+import io.asyncer.r2dbc.mysql.client.ReactorNettyClient;
+import io.asyncer.r2dbc.mysql.constant.ProtocolDriver;
+import io.asyncer.r2dbc.mysql.internal.NodeAddress;
+import io.asyncer.r2dbc.mysql.internal.util.InternalArrays;
+import io.netty.resolver.DefaultNameResolver;
+import io.netty.resolver.NameResolver;
+import io.netty.util.concurrent.Future;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.netty.resources.LoopResources;
+import reactor.netty.tcp.TcpResources;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * An abstraction for {@link ConnectionStrategy} that consider multiple hosts.
+ */
+final class MultiHostsConnectionStrategy implements ConnectionStrategy {
+
+ private final Mono extends Client> client;
+
+ MultiHostsConnectionStrategy(
+ MySqlConnectionConfiguration configuration,
+ List addresses,
+ ProtocolDriver driver,
+ int retriesAllDown,
+ boolean shuffle,
+ boolean tcpKeepAlive,
+ boolean tcpNoDelay
+ ) {
+ Mono client = configuration.getCredential().flatMap(credential -> {
+ if (ProtocolDriver.DNS_SRV.equals(driver)) {
+ logger.debug("Resolve hosts via DNS SRV: {}", addresses);
+
+ LoopResources resources = configuration.getClient().getLoopResources();
+ LoopResources loopResources = resources == null ? TcpResources.get() : resources;
+ InetConnectFunction login = new InetConnectFunction(
+ false,
+ tcpKeepAlive,
+ tcpNoDelay,
+ credential,
+ configuration
+ );
+
+ return resolveAllHosts(loopResources, addresses, shuffle).flatMap(addrs -> {
+ logger.debug("Connect to multiple addresses: {}", addrs);
+
+ return connectHost(
+ addrs,
+ login,
+ shuffle,
+ 0,
+ retriesAllDown
+ );
+ });
+ } else {
+ List availableHosts = copyAvailableAddresses(addresses, shuffle);
+ logger.debug("Connect to multiple hosts: {}", availableHosts);
+
+ int size = availableHosts.size();
+ InetSocketAddress[] array = new InetSocketAddress[availableHosts.size()];
+
+ for (int i = 0; i < size; i++) {
+ array[i] = availableHosts.get(i).toUnresolved();
+ }
+
+ List addrs = InternalArrays.asImmutableList(array);
+ InetConnectFunction login = new InetConnectFunction(
+ true,
+ tcpKeepAlive,
+ tcpNoDelay,
+ credential,
+ configuration
+ );
+
+ return connectHost(
+ addrs,
+ login,
+ shuffle,
+ 0,
+ retriesAllDown
+ );
+ }
+ });
+
+ this.client = client.map(c -> new FailoverClient(c, client));
+ }
+
+ @Override
+ public Mono extends Client> connect() {
+ return client;
+ }
+
+ private static Mono connectHost(
+ List addresses,
+ InetConnectFunction login,
+ boolean shuffle,
+ int attempts,
+ int maxAttempts
+ ) {
+ Iterator iter = addresses.iterator();
+
+ if (!iter.hasNext()) {
+ return Mono.error(ConnectionStrategy.retryFail("Fail to establish connection: no available host", null));
+ }
+
+
+ InetSocketAddress address = iter.next();
+
+ return login.apply(() -> address).onErrorResume(error -> resumeConnect(
+ error,
+ address,
+ addresses,
+ iter,
+ login,
+ shuffle,
+ attempts,
+ maxAttempts
+ ));
+ }
+
+ private static Mono resumeConnect(
+ Throwable t,
+ InetSocketAddress failed,
+ List addresses,
+ Iterator iter,
+ InetConnectFunction login,
+ boolean shuffle,
+ int attempts,
+ int maxAttempts
+ ) {
+ logger.warn("Fail to connect to {}", failed, t);
+
+ if (!iter.hasNext()) {
+ // The last host failed to connect
+ if (attempts >= maxAttempts) {
+ return Mono.error(ConnectionStrategy.retryFail(
+ "Fail to establish connections, retried " + attempts + " times", t));
+ }
+
+ logger.warn("All hosts failed to establish connections, auto-try again after 250ms.", t);
+
+ // Ignore waiting error, e.g. interrupted, scheduler rejected
+ return Mono.delay(Duration.ofMillis(250))
+ .onErrorComplete()
+ .then(Mono.defer(() -> connectHost(
+ addresses,
+ login,
+ shuffle,
+ attempts + 1,
+ maxAttempts
+ )));
+ }
+
+ InetSocketAddress address = iter.next();
+
+ return login.apply(() -> address).onErrorResume(error -> resumeConnect(
+ error,
+ address,
+ addresses,
+ iter,
+ login,
+ shuffle,
+ attempts,
+ maxAttempts
+ ));
+ }
+
+ private static Mono> resolveAllHosts(
+ LoopResources loopResources,
+ List addresses,
+ boolean shuffle
+ ) {
+ // Or DnsNameResolver? It is non-blocking but requires native dependencies, hard configurations, and maybe
+ // behaves differently. Currently, we use DefaultNameResolver which is blocking but simple and easy to use.
+ @SuppressWarnings("resource")
+ DefaultNameResolver resolver = new DefaultNameResolver(loopResources.onClient(true).next());
+
+ return Flux.fromIterable(addresses)
+ .flatMap(address -> resolveAll(resolver, address.getHost())
+ .flatMapIterable(Function.identity())
+ .map(inet -> new InetSocketAddress(inet, address.getPort())))
+ .doFinally(ignore -> resolver.close())
+ .collectList()
+ .map(list -> {
+ if (shuffle) {
+ Collections.shuffle(list);
+ }
+
+ return list;
+ });
+ }
+
+ private static Mono> resolveAll(NameResolver resolver, String host) {
+ Future> future = resolver.resolveAll(host);
+
+ return Mono.>create(sink -> future.addListener(f -> {
+ if (f.isSuccess()) {
+ try {
+ @SuppressWarnings("unchecked")
+ List t = (List) f.getNow();
+
+ logger.debug("Resolve {} in DNS succeed, {} records", host, t.size());
+ sink.success(t);
+ } catch (Throwable e) {
+ logger.warn("Resolve {} in DNS succeed but failed to get result", host, e);
+ sink.success(Collections.emptyList());
+ }
+ } else {
+ logger.warn("Resolve {} in DNS failed", host, f.cause());
+ sink.success(Collections.emptyList());
+ }
+ })).doOnCancel(() -> future.cancel(false));
+ }
+
+ private static List copyAvailableAddresses(List addresses, boolean shuffle) {
+ if (shuffle) {
+ List copied = new ArrayList<>(addresses);
+ Collections.shuffle(copied);
+ return copied;
+ }
+
+ return InternalArrays.asImmutableList(addresses.toArray(new NodeAddress[0]));
+ }
+}
diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlBatchingBatch.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlBatchingBatch.java
index a71c31986..c2517e29b 100644
--- a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlBatchingBatch.java
+++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlBatchingBatch.java
@@ -27,8 +27,8 @@
import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.requireNonNull;
/**
- * An implementation of {@link MySqlBatch} for executing a collection of statements in a batch against the
- * MySQL database.
+ * An implementation of {@link MySqlBatch} for executing a collection of statements in a batch against the MySQL
+ * database.
*/
final class MySqlBatchingBatch implements MySqlBatch {
diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionConfiguration.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionConfiguration.java
index 3856b58bd..fb4ba179a 100644
--- a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionConfiguration.java
+++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionConfiguration.java
@@ -17,13 +17,17 @@
package io.asyncer.r2dbc.mysql;
import io.asyncer.r2dbc.mysql.constant.CompressionAlgorithm;
+import io.asyncer.r2dbc.mysql.constant.HaProtocol;
+import io.asyncer.r2dbc.mysql.constant.ProtocolDriver;
import io.asyncer.r2dbc.mysql.constant.SslMode;
import io.asyncer.r2dbc.mysql.constant.ZeroDateOption;
import io.asyncer.r2dbc.mysql.extension.Extension;
import io.asyncer.r2dbc.mysql.internal.util.InternalArrays;
+import io.asyncer.r2dbc.mysql.internal.util.StringUtils;
import io.netty.handler.ssl.SslContextBuilder;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher;
+import reactor.core.publisher.Mono;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.TcpResources;
@@ -38,46 +42,29 @@
import java.util.EnumSet;
import java.util.List;
import java.util.Objects;
+import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
+import java.util.function.Supplier;
import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.require;
import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.requireNonEmpty;
import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.requireNonNull;
-import static io.asyncer.r2dbc.mysql.internal.util.InternalArrays.EMPTY_STRINGS;
/**
* A configuration of MySQL connection.
*/
public final class MySqlConnectionConfiguration {
- /**
- * Default MySQL port.
- */
- private static final int DEFAULT_PORT = 3306;
-
- /**
- * {@code true} if {@link #domain} is hostname, otherwise {@link #domain} is unix domain socket path.
- */
- private final boolean isHost;
-
- /**
- * Domain of connecting, may be hostname or unix domain socket path.
- */
- private final String domain;
+ private final SocketClientConfiguration client;
- private final int port;
+ private final SocketConfiguration socket;
private final MySqlSslConfiguration ssl;
- private final boolean tcpKeepAlive;
-
- private final boolean tcpNoDelay;
-
- @Nullable
- private final Duration connectTimeout;
+ private final boolean autoReconnect;
private final boolean preserveInstants;
@@ -87,10 +74,9 @@ public final class MySqlConnectionConfiguration {
private final ZeroDateOption zeroDateOption;
- private final String user;
+ private final Mono user;
- @Nullable
- private final CharSequence password;
+ private final Mono> password;
private final String database;
@@ -120,42 +106,37 @@ public final class MySqlConnectionConfiguration {
private final int zstdCompressionLevel;
- private final LoopResources loopResources;
-
private final Extensions extensions;
- @Nullable
- private final Publisher passwordPublisher;
-
private MySqlConnectionConfiguration(
- boolean isHost, String domain, int port, MySqlSslConfiguration ssl,
- boolean tcpKeepAlive, boolean tcpNoDelay, @Nullable Duration connectTimeout,
+ SocketClientConfiguration client,
+ SocketConfiguration socket,
+ MySqlSslConfiguration ssl,
+ boolean autoReconnect,
ZeroDateOption zeroDateOption,
boolean preserveInstants,
String connectionTimeZone,
boolean forceConnectionTimeZoneToSession,
- String user, @Nullable CharSequence password, @Nullable String database,
+ Mono user,
+ Mono> password,
+ @Nullable String database,
boolean createDatabaseIfNotExist, @Nullable Predicate preferPrepareStatement,
List sessionVariables, @Nullable Duration lockWaitTimeout, @Nullable Duration statementTimeout,
@Nullable Path loadLocalInfilePath, int localInfileBufferSize,
int queryCacheSize, int prepareCacheSize,
Set compressionAlgorithms, int zstdCompressionLevel,
- @Nullable LoopResources loopResources,
- Extensions extensions, @Nullable Publisher passwordPublisher
+ Extensions extensions
) {
- this.isHost = isHost;
- this.domain = domain;
- this.port = port;
- this.tcpKeepAlive = tcpKeepAlive;
- this.tcpNoDelay = tcpNoDelay;
- this.connectTimeout = connectTimeout;
- this.ssl = ssl;
+ this.client = requireNonNull(client, "client must not be null");
+ this.socket = requireNonNull(socket, "socket must not be null");
+ this.ssl = requireNonNull(ssl, "ssl must not be null");
+ this.autoReconnect = autoReconnect;
this.preserveInstants = preserveInstants;
this.connectionTimeZone = requireNonNull(connectionTimeZone, "connectionTimeZone must not be null");
this.forceConnectionTimeZoneToSession = forceConnectionTimeZoneToSession;
this.zeroDateOption = requireNonNull(zeroDateOption, "zeroDateOption must not be null");
this.user = requireNonNull(user, "user must not be null");
- this.password = password;
+ this.password = requireNonNull(password, "password must not be null");
this.database = database == null || database.isEmpty() ? "" : database;
this.createDatabaseIfNotExist = createDatabaseIfNotExist;
this.preferPrepareStatement = preferPrepareStatement;
@@ -168,9 +149,7 @@ private MySqlConnectionConfiguration(
this.prepareCacheSize = prepareCacheSize;
this.compressionAlgorithms = compressionAlgorithms;
this.zstdCompressionLevel = zstdCompressionLevel;
- this.loopResources = loopResources == null ? TcpResources.get() : loopResources;
this.extensions = extensions;
- this.passwordPublisher = passwordPublisher;
}
/**
@@ -182,33 +161,20 @@ public static Builder builder() {
return new Builder();
}
- boolean isHost() {
- return isHost;
- }
-
- String getDomain() {
- return domain;
+ SocketClientConfiguration getClient() {
+ return client;
}
- int getPort() {
- return port;
- }
-
- @Nullable
- Duration getConnectTimeout() {
- return connectTimeout;
+ SocketConfiguration getSocket() {
+ return socket;
}
MySqlSslConfiguration getSsl() {
return ssl;
}
- boolean isTcpKeepAlive() {
- return this.tcpKeepAlive;
- }
-
- boolean isTcpNoDelay() {
- return this.tcpNoDelay;
+ boolean isAutoReconnect() {
+ return autoReconnect;
}
ZeroDateOption getZeroDateOption() {
@@ -223,17 +189,25 @@ String getConnectionTimeZone() {
return connectionTimeZone;
}
- boolean isForceConnectionTimeZoneToSession() {
- return forceConnectionTimeZoneToSession;
+ @Nullable
+ ZoneId retrieveConnectionZoneId() {
+ String timeZone = this.connectionTimeZone;
+
+ if ("LOCAL".equalsIgnoreCase(timeZone)) {
+ return ZoneId.systemDefault().normalized();
+ } else if ("SERVER".equalsIgnoreCase(timeZone)) {
+ return null;
+ }
+
+ return StringUtils.parseZoneId(timeZone);
}
- String getUser() {
- return user;
+ boolean isForceConnectionTimeZoneToSession() {
+ return forceConnectionTimeZoneToSession;
}
- @Nullable
- CharSequence getPassword() {
- return password;
+ Mono getCredential() {
+ return Mono.zip(user, password, (u, p) -> new Credential(u, p.orElse(null)));
}
String getDatabase() {
@@ -288,19 +262,10 @@ int getZstdCompressionLevel() {
return zstdCompressionLevel;
}
- LoopResources getLoopResources() {
- return loopResources;
- }
-
Extensions getExtensions() {
return extensions;
}
- @Nullable
- Publisher getPasswordPublisher() {
- return passwordPublisher;
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -309,20 +274,19 @@ public boolean equals(Object o) {
if (!(o instanceof MySqlConnectionConfiguration)) {
return false;
}
+
MySqlConnectionConfiguration that = (MySqlConnectionConfiguration) o;
- return isHost == that.isHost &&
- domain.equals(that.domain) &&
- port == that.port &&
+
+ return client.equals(that.client) &&
+ socket.equals(that.socket) &&
ssl.equals(that.ssl) &&
- tcpKeepAlive == that.tcpKeepAlive &&
- tcpNoDelay == that.tcpNoDelay &&
- Objects.equals(connectTimeout, that.connectTimeout) &&
+ autoReconnect == that.autoReconnect &&
preserveInstants == that.preserveInstants &&
- Objects.equals(connectionTimeZone, that.connectionTimeZone) &&
+ connectionTimeZone.equals(that.connectionTimeZone) &&
forceConnectionTimeZoneToSession == that.forceConnectionTimeZoneToSession &&
zeroDateOption == that.zeroDateOption &&
user.equals(that.user) &&
- Objects.equals(password, that.password) &&
+ password.equals(that.password) &&
database.equals(that.database) &&
createDatabaseIfNotExist == that.createDatabaseIfNotExist &&
Objects.equals(preferPrepareStatement, that.preferPrepareStatement) &&
@@ -335,16 +299,22 @@ public boolean equals(Object o) {
prepareCacheSize == that.prepareCacheSize &&
compressionAlgorithms.equals(that.compressionAlgorithms) &&
zstdCompressionLevel == that.zstdCompressionLevel &&
- Objects.equals(loopResources, that.loopResources) &&
- extensions.equals(that.extensions) &&
- Objects.equals(passwordPublisher, that.passwordPublisher);
+ extensions.equals(that.extensions);
}
@Override
public int hashCode() {
- return Objects.hash(isHost, domain, port, ssl, tcpKeepAlive, tcpNoDelay, connectTimeout,
- preserveInstants, connectionTimeZone, forceConnectionTimeZoneToSession,
- zeroDateOption, user, password, database, createDatabaseIfNotExist,
+ return Objects.hash(
+ client,
+ socket, ssl,
+ autoReconnect,
+ preserveInstants,
+ connectionTimeZone,
+ forceConnectionTimeZoneToSession,
+ zeroDateOption,
+ user, password,
+ database,
+ createDatabaseIfNotExist,
preferPrepareStatement,
sessionVariables,
lockWaitTimeout,
@@ -352,40 +322,23 @@ public int hashCode() {
loadLocalInfilePath, localInfileBufferSize,
queryCacheSize, prepareCacheSize,
compressionAlgorithms, zstdCompressionLevel,
- loopResources, extensions, passwordPublisher);
+ extensions);
}
@Override
public String toString() {
- if (isHost) {
- return "MySqlConnectionConfiguration{host='" + domain + "', port=" + port + ", ssl=" + ssl +
- ", tcpNoDelay=" + tcpNoDelay + ", tcpKeepAlive=" + tcpKeepAlive +
- ", connectTimeout=" + connectTimeout +
- ", preserveInstants=" + preserveInstants +
- ", connectionTimeZone=" + connectionTimeZone +
- ", forceConnectionTimeZoneToSession=" + forceConnectionTimeZoneToSession +
- ", zeroDateOption=" + zeroDateOption + ", user='" + user + "', password=" + password +
- ", database='" + database + "', createDatabaseIfNotExist=" + createDatabaseIfNotExist +
- ", preferPrepareStatement=" + preferPrepareStatement +
- ", sessionVariables=" + sessionVariables +
- ", lockWaitTimeout=" + lockWaitTimeout +
- ", statementTimeout=" + statementTimeout +
- ", loadLocalInfilePath=" + loadLocalInfilePath +
- ", localInfileBufferSize=" + localInfileBufferSize +
- ", queryCacheSize=" + queryCacheSize + ", prepareCacheSize=" + prepareCacheSize +
- ", compressionAlgorithms=" + compressionAlgorithms +
- ", zstdCompressionLevel=" + zstdCompressionLevel +
- ", loopResources=" + loopResources +
- ", extensions=" + extensions + ", passwordPublisher=" + passwordPublisher + '}';
- }
-
- return "MySqlConnectionConfiguration{unixSocket='" + domain +
- "', connectTimeout=" + connectTimeout +
+ return "MySqlConnectionConfiguration{client=" + client +
+ ", socket=" + socket +
+ ", ssl=" + ssl +
+ ", autoReconnect=" + autoReconnect +
", preserveInstants=" + preserveInstants +
- ", connectionTimeZone=" + connectionTimeZone +
+ ", connectionTimeZone='" + connectionTimeZone + '\'' +
", forceConnectionTimeZoneToSession=" + forceConnectionTimeZoneToSession +
- ", zeroDateOption=" + zeroDateOption + ", user='" + user + "', password=" + password +
- ", database='" + database + "', createDatabaseIfNotExist=" + createDatabaseIfNotExist +
+ ", zeroDateOption=" + zeroDateOption +
+ ", user=" + user +
+ ", password=REDACTED" +
+ ", database='" + database + '\'' +
+ ", createDatabaseIfNotExist=" + createDatabaseIfNotExist +
", preferPrepareStatement=" + preferPrepareStatement +
", sessionVariables=" + sessionVariables +
", lockWaitTimeout=" + lockWaitTimeout +
@@ -396,8 +349,8 @@ public String toString() {
", prepareCacheSize=" + prepareCacheSize +
", compressionAlgorithms=" + compressionAlgorithms +
", zstdCompressionLevel=" + zstdCompressionLevel +
- ", loopResources=" + loopResources +
- ", extensions=" + extensions + ", passwordPublisher=" + passwordPublisher + '}';
+ ", extensions=" + extensions +
+ '}';
}
/**
@@ -405,24 +358,28 @@ public String toString() {
*/
public static final class Builder {
+ private final SocketClientConfiguration.Builder client = new SocketClientConfiguration.Builder();
+
@Nullable
- private String database;
+ private TcpSocketConfiguration.Builder tcpSocket;
- private boolean createDatabaseIfNotExist;
+ @Nullable
+ private UnixDomainSocketConfiguration.Builder unixSocket;
- private boolean isHost = true;
+ private final MySqlSslConfiguration.Builder ssl = new MySqlSslConfiguration.Builder();
- private String domain;
+ private boolean autoReconnect;
@Nullable
- private CharSequence password;
+ private String database;
- private int port = DEFAULT_PORT;
+ private boolean createDatabaseIfNotExist;
@Nullable
- private Duration connectTimeout;
+ private Mono user;
- private String user;
+ @Nullable
+ private Mono password;
private ZeroDateOption zeroDateOption = ZeroDateOption.USE_NULL;
@@ -432,33 +389,6 @@ public static final class Builder {
private boolean forceConnectionTimeZoneToSession;
- @Nullable
- private SslMode sslMode;
-
- private String[] tlsVersion = EMPTY_STRINGS;
-
- @Nullable
- private HostnameVerifier sslHostnameVerifier;
-
- @Nullable
- private String sslCa;
-
- @Nullable
- private String sslKey;
-
- @Nullable
- private CharSequence sslKeyPassword;
-
- @Nullable
- private String sslCert;
-
- @Nullable
- private Function sslContextBuilderCustomizer;
-
- private boolean tcpKeepAlive;
-
- private boolean tcpNoDelay;
-
@Nullable
private Predicate preferPrepareStatement;
@@ -484,58 +414,66 @@ public static final class Builder {
private int zstdCompressionLevel = 3;
- @Nullable
- private LoopResources loopResources;
-
private boolean autodetectExtensions = true;
private final List extensions = new ArrayList<>();
- @Nullable
- private Publisher passwordPublisher;
-
/**
* Builds an immutable {@link MySqlConnectionConfiguration} with current options.
*
* @return the {@link MySqlConnectionConfiguration}.
*/
public MySqlConnectionConfiguration build() {
- SslMode sslMode = requireSslMode();
-
- if (isHost) {
- requireNonNull(domain, "host must not be null when using TCP socket");
- require((sslCert == null && sslKey == null) || (sslCert != null && sslKey != null),
- "sslCert and sslKey must be both null or both non-null");
+ Mono user = requireNonNull(this.user, "User must be configured");
+ Mono auth = this.password;
+ Mono> password = auth == null ? Mono.just(Optional.empty()) : auth.singleOptional();
+ SocketConfiguration socket;
+ boolean preferredSsl;
+
+ if (unixSocket == null) {
+ socket = requireNonNull(tcpSocket, "Connection must be either TCP/SSL or Unix Domain Socket").build();
+ preferredSsl = true;
} else {
- requireNonNull(domain, "unixSocket must not be null when using unix domain socket");
- require(!sslMode.startSsl(), "sslMode must be disabled when using unix domain socket");
+ // Since 1.2.0, we support SSL over Unix Domain Socket, default SSL mode is DISABLED.
+ // But, if a Unix Domain Socket can be listened to by someone, this indicates that the system itself
+ // has been compromised, and enabling SSL does not improve the security of the connection.
+ socket = unixSocket.build();
+ preferredSsl = false;
}
int prepareCacheSize = preferPrepareStatement == null ? 0 : this.prepareCacheSize;
- MySqlSslConfiguration ssl = MySqlSslConfiguration.create(sslMode, tlsVersion, sslHostnameVerifier,
- sslCa, sslKey, sslKeyPassword, sslCert, sslContextBuilderCustomizer);
- return new MySqlConnectionConfiguration(isHost, domain, port, ssl, tcpKeepAlive, tcpNoDelay,
- connectTimeout, zeroDateOption,
+ return new MySqlConnectionConfiguration(
+ client.build(),
+ socket,
+ ssl.build(preferredSsl),
+ autoReconnect,
+ zeroDateOption,
preserveInstants,
connectionTimeZone,
forceConnectionTimeZoneToSession,
- user, password, database,
- createDatabaseIfNotExist, preferPrepareStatement,
+ user.single(),
+ password,
+ database,
+ createDatabaseIfNotExist,
+ preferPrepareStatement,
sessionVariables,
lockWaitTimeout,
statementTimeout,
loadLocalInfilePath,
- localInfileBufferSize, queryCacheSize, prepareCacheSize,
- compressionAlgorithms, zstdCompressionLevel, loopResources,
- Extensions.from(extensions, autodetectExtensions), passwordPublisher);
+ localInfileBufferSize,
+ queryCacheSize,
+ prepareCacheSize,
+ compressionAlgorithms,
+ zstdCompressionLevel,
+ Extensions.from(extensions, autodetectExtensions));
}
/**
* Configures the database. Default no database.
*
* @param database the database, or {@code null} if no database want to be login.
- * @return this {@link Builder}.
+ * @return {@link Builder this}
* @since 0.8.1
*/
public Builder database(@Nullable String database) {
@@ -548,7 +486,7 @@ public Builder database(@Nullable String database) {
* {@code false}.
*
* @param enabled to discover and register extensions.
- * @return this {@link Builder}.
+ * @return {@link Builder this}
* @since 1.0.6
*/
public Builder createDatabaseIfNotExist(boolean enabled) {
@@ -558,58 +496,138 @@ public Builder createDatabaseIfNotExist(boolean enabled) {
/**
* Configures the Unix Domain Socket to connect to.
+ *
+ * Note: It will override all TCP and SSL configurations if configured.
*
- * @param unixSocket the socket file path.
- * @return this {@link Builder}.
- * @throws IllegalArgumentException if {@code unixSocket} is {@code null}.
+ * @param path the socket file path.
+ * @return {@link Builder this}
+ * @throws IllegalArgumentException if {@code path} is {@code null} or empty.
* @since 0.8.1
*/
- public Builder unixSocket(String unixSocket) {
- this.domain = requireNonNull(unixSocket, "unixSocket must not be null");
- this.isHost = false;
+ public Builder unixSocket(String path) {
+ requireNonEmpty(path, "path must not be null");
+
+ requireUnixSocket().path(path);
return this;
}
/**
- * Configures the host.
+ * Configures the single-host.
+ *
+ * Note: Used only if the {@link #unixSocket(String)} and {@link #addHost multiple hosts} is not configured.
*
* @param host the host.
- * @return this {@link Builder}.
- * @throws IllegalArgumentException if {@code host} is {@code null}.
+ * @return {@link Builder this}
+ * @throws IllegalArgumentException if {@code host} is {@code null} or empty.
* @since 0.8.1
*/
public Builder host(String host) {
- this.domain = requireNonNull(host, "host must not be null");
- this.isHost = true;
+ requireNonEmpty(host, "host must not be empty");
+
+ requireTcpSocket().host(host);
return this;
}
/**
- * Configures the password. Default login without password.
+ * Configures the port of {@link #host(String)}. Defaults to {@code 3306}.
*
- * Note: for memory security, should not use intern {@link String} for password.
+ * Note: Used only if the {@link #unixSocket(String)} and {@link #addHost multiple hosts} is not configured.
*
- * @param password the password, or {@code null} when user has no password.
- * @return this {@link Builder}.
+ * @param port the port.
+ * @return {@link Builder this}
+ * @throws IllegalArgumentException if the {@code port} is negative or bigger than {@literal 65535}.
* @since 0.8.1
*/
- public Builder password(@Nullable CharSequence password) {
- this.password = password;
+ public Builder port(int port) {
+ require(port >= 0 && port <= 0xFFFF, "port must be between 0 and 65535");
+
+ requireTcpSocket().port(port);
return this;
}
/**
- * Configures the port. Defaults to {@code 3306}.
+ * Adds a host with default port 3306 to the list of multiple hosts to connect to.
+ *
+ * Note: Used only if the {@link #unixSocket(String)} and {@link #host single host} is not configured.
*
- * @param port the port.
- * @return this {@link Builder}.
- * @throws IllegalArgumentException if the {@code port} is negative or bigger than {@literal 65535}.
- * @since 0.8.1
+ * @param host the host to add.
+ * @return {@link Builder this}
+ * @throws IllegalArgumentException if {@code host} is {@code null} or empty.
+ * @since 1.2.0
*/
- public Builder port(int port) {
+ public Builder addHost(String host) {
+ requireNonEmpty(host, "host must not be empty");
+
+ requireTcpSocket().addHost(host);
+ return this;
+ }
+
+ /**
+ * Adds a host to the list of multiple hosts to connect to.
+ *
+ * Note: Used only if the {@link #unixSocket(String)} and {@link #host single host} is not configured.
+ *
+ * @param host the host to add.
+ * @param port the port of the host.
+ * @return {@link Builder this}
+ * @throws IllegalArgumentException if the {@code host} is empty or the {@code port} is not between 0 and
+ * 65535.
+ * @since 1.2.0
+ */
+ public Builder addHost(String host, int port) {
+ requireNonEmpty(host, "host must not be empty");
require(port >= 0 && port <= 0xFFFF, "port must be between 0 and 65535");
- this.port = port;
+ requireTcpSocket().addHost(host, port);
+ return this;
+ }
+
+ /**
+ * Configures the failover and high availability protocol driver. Default to {@link ProtocolDriver#MYSQL}. Used
+ * only if the {@link #unixSocket(String)} is not configured.
+ *
+ * @param driver the protocol driver.
+ * @return {@link Builder this}
+ * @throws IllegalArgumentException if {@code driver} is {@code null}.
+ * @since 1.2.0
+ */
+ public Builder driver(ProtocolDriver driver) {
+ requireNonNull(driver, "driver must not be null");
+
+ requireTcpSocket().driver(driver);
+ return this;
+ }
+
+ /**
+ * Configures the failover and high availability protocol. Default to {@link HaProtocol#DEFAULT}. Used only if
+ * the {@link #unixSocket(String)} is not configured.
+ *
+ * @param protocol the failover and high availability protocol.
+ * @return {@link Builder this}
+ * @throws IllegalArgumentException if {@code protocol} is {@code null}.
+ * @since 1.2.0
+ */
+ public Builder protocol(HaProtocol protocol) {
+ requireNonNull(protocol, "protocol must not be null");
+
+ requireTcpSocket().protocol(protocol);
+ return this;
+ }
+
+ /**
+ * Configures whether to perform failover reconnection. Default is {@code false}.
+ *
+ * It is not recommended due to it may lead to unexpected results. For example, it may recover a transaction
+ * state from a failed server node to an available node, the user can not aware of it, and continuing to execute
+ * more queries in the transaction will lead to unexpected inconsistencies.
+ *
+ * @param enabled {@code true} to enable failover reconnection.
+ * @return {@link Builder this}
+ * @see JDBC Failover
+ * @since 1.2.0
+ */
+ public Builder autoReconnect(boolean enabled) {
+ this.autoReconnect = enabled;
return this;
}
@@ -617,11 +635,11 @@ public Builder port(int port) {
* Configures the connection timeout. Default no timeout.
*
* @param connectTimeout the connection timeout, or {@code null} if no timeout.
- * @return this {@link Builder}.
+ * @return {@link Builder this}
* @since 0.8.1
*/
public Builder connectTimeout(@Nullable Duration connectTimeout) {
- this.connectTimeout = connectTimeout;
+ this.client.connectTimeout(connectTimeout);
return this;
}
@@ -629,20 +647,52 @@ public Builder connectTimeout(@Nullable Duration connectTimeout) {
* Configures the user for login the database.
*
* @param user the user.
- * @return this {@link Builder}.
- * @throws IllegalArgumentException if {@code user} is {@code null}.
+ * @return {@link Builder this}
+ * @throws IllegalArgumentException if {@code user} is {@code null} or empty.
* @since 0.8.2
*/
public Builder user(String user) {
- this.user = requireNonNull(user, "user must not be null");
+ requireNonEmpty(user, "user must not be empty");
+
+ this.user = Mono.just(user);
return this;
}
/**
- * An alias of {@link #user(String)}.
+ * Configures the user for login the database.
+ *
+ * @param user a {@link Supplier} to retrieve user.
+ * @return {@link Builder this}
+ * @throws IllegalArgumentException if {@code user} is {@code null}.
+ * @since 1.2.0
+ */
+ public Builder user(Supplier user) {
+ requireNonNull(user, "user must not be null");
+
+ this.user = Mono.fromSupplier(user);
+ return this;
+ }
+
+ /**
+ * Configures the user for login the database.
+ *
+ * @param user a {@link Publisher} to retrieve user.
+ * @return {@link Builder this}
+ * @throws IllegalArgumentException if {@code user} is {@code null}.
+ * @since 1.2.0
+ */
+ public Builder user(Publisher user) {
+ requireNonNull(user, "user must not be null");
+
+ this.user = Mono.from(user);
+ return this;
+ }
+
+ /**
+ * Configures the user for login the database. Since 0.8.2, it is an alias of {@link #user(String)}.
*
* @param user the user.
- * @return this {@link Builder}.
+ * @return {@link Builder this}
* @throws IllegalArgumentException if {@code user} is {@code null}.
* @since 0.8.1
*/
@@ -651,8 +701,52 @@ public Builder username(String user) {
}
/**
- * Configures the time zone conversion. Default to {@code true} means enable conversion between JVM
- * and {@link #connectionTimeZone(String)}.
+ * Configures the password. Default login without password.
+ *
+ * Note: for memory security, should not use intern {@link String} for password.
+ *
+ * @param password the password, or {@code null} when user has no password.
+ * @return {@link Builder this}
+ * @since 0.8.1
+ */
+ public Builder password(@Nullable CharSequence password) {
+ this.password = Mono.justOrEmpty(password);
+ return this;
+ }
+
+ /**
+ * Configures the password. Default login without password.
+ *
+ * @param password a {@link Supplier} to retrieve password.
+ * @return {@link Builder this}
+ * @throws IllegalArgumentException if {@code password} is {@code null}.
+ * @since 1.2.0
+ */
+ public Builder password(Supplier extends CharSequence> password) {
+ requireNonNull(password, "password must not be null");
+
+ this.password = Mono.fromSupplier(password);
+ return this;
+ }
+
+ /**
+ * Configures the password. Default login without password.
+ *
+ * @param password a {@link Publisher} to retrieve password.
+ * @return {@link Builder this}
+ * @throws IllegalArgumentException if {@code password} is {@code null}.
+ * @since 1.2.0
+ */
+ public Builder password(Publisher extends CharSequence> password) {
+ requireNonNull(password, "password must not be null");
+
+ this.password = Mono.from(password);
+ return this;
+ }
+
+ /**
+ * Configures the time zone conversion. Default to {@code true} means enable conversion between JVM and
+ * {@link #connectionTimeZone(String)}.
*
* Note: disable it will ignore the time zone of connection, and use the JVM local time zone.
*
@@ -682,8 +776,8 @@ public Builder connectionTimeZone(String connectionTimeZone) {
}
/**
- * Configures to force the connection time zone to session time zone. Default to {@code false}. Used
- * only if the {@link #connectionTimeZone(String)} is not {@code "SERVER"}.
+ * Configures to force the connection time zone to session time zone. Default to {@code false}. Used only if
+ * the {@link #connectionTimeZone(String)} is not {@code "SERVER"}.
*
* Note: alter the time zone of session will affect the results of MySQL date/time functions, e.g.
* {@code NOW([n])}, {@code CURRENT_TIME([n])}, {@code CURRENT_DATE()}, etc. Please use with caution.
@@ -711,11 +805,11 @@ public Builder serverZoneId(@Nullable ZoneId serverZoneId) {
}
/**
- * Configures the {@link ZeroDateOption}. Default to {@link ZeroDateOption#USE_NULL}. It is a
- * behavior option when this driver receives a value of zero-date.
+ * Configures the {@link ZeroDateOption}. Default to {@link ZeroDateOption#USE_NULL}. It is a behavior option
+ * when this driver receives a value of zero-date.
*
* @param zeroDate the {@link ZeroDateOption}.
- * @return this {@link Builder}.
+ * @return {@link Builder this}
* @throws IllegalArgumentException if {@code zeroDate} is {@code null}.
* @since 0.8.1
*/
@@ -726,46 +820,46 @@ public Builder zeroDateOption(ZeroDateOption zeroDate) {
/**
* Configures ssl mode. See also {@link SslMode}.
+ *
+ * Note: It is used only if the {@link #unixSocket(String)} is not configured.
*
* @param sslMode the SSL mode to use.
- * @return this {@link Builder}.
+ * @return {@link Builder this}
* @throws IllegalArgumentException if {@code sslMode} is {@code null}.
* @since 0.8.1
*/
public Builder sslMode(SslMode sslMode) {
- this.sslMode = requireNonNull(sslMode, "sslMode must not be null");
+ requireNonNull(sslMode, "sslMode must not be null");
+
+ this.ssl.sslMode(sslMode);
return this;
}
/**
* Configures TLS versions, see {@link io.asyncer.r2dbc.mysql.constant.TlsVersions TlsVersions}.
+ *
+ * Note: It is used only if the {@link #unixSocket(String)} is not configured.
*
* @param tlsVersion TLS versions.
- * @return this {@link Builder}.
+ * @return {@link Builder this}
* @throws IllegalArgumentException if the array {@code tlsVersion} is {@code null}.
* @since 0.8.1
*/
public Builder tlsVersion(String... tlsVersion) {
requireNonNull(tlsVersion, "tlsVersion must not be null");
- int size = tlsVersion.length;
-
- if (size > 0) {
- String[] versions = new String[size];
- System.arraycopy(tlsVersion, 0, versions, 0, size);
- this.tlsVersion = versions;
- } else {
- this.tlsVersion = EMPTY_STRINGS;
- }
+ this.ssl.tlsVersions(tlsVersion);
return this;
}
/**
* Configures SSL {@link HostnameVerifier}, it is available only set {@link #sslMode(SslMode)} as
- * {@link SslMode#VERIFY_IDENTITY}. It is useful when server was using special Certificates or need
- * special verification.
+ * {@link SslMode#VERIFY_IDENTITY}. It is useful when server was using special Certificates or need special
+ * verification.
*
* Default is builtin {@link HostnameVerifier} which use RFC standards.
+ *
+ * Note: It is used only if the {@link #unixSocket(String)} is not configured.
*
* @param sslHostnameVerifier the custom {@link HostnameVerifier}.
* @return this {@link Builder}
@@ -773,8 +867,9 @@ public Builder tlsVersion(String... tlsVersion) {
* @since 0.8.2
*/
public Builder sslHostnameVerifier(HostnameVerifier sslHostnameVerifier) {
- this.sslHostnameVerifier = requireNonNull(sslHostnameVerifier,
- "sslHostnameVerifier must not be null");
+ requireNonNull(sslHostnameVerifier, "sslHostnameVerifier must not be null");
+
+ this.ssl.sslHostnameVerifier(sslHostnameVerifier);
return this;
}
@@ -783,41 +878,47 @@ public Builder sslHostnameVerifier(HostnameVerifier sslHostnameVerifier) {
* {@link #sslMode(SslMode)} is configured for verify server certification.
*
* Default is {@code null}, which means that the default algorithm is used for the trust manager.
+ *
+ * Note: It is used only if the {@link #unixSocket(String)} is not configured.
*
* @param sslCa an X.509 certificate chain file in PEM format.
- * @return this {@link Builder}.
+ * @return {@link Builder this}
* @since 0.8.1
*/
public Builder sslCa(@Nullable String sslCa) {
- this.sslCa = sslCa;
+ this.ssl.sslCa(sslCa);
return this;
}
/**
* Configures client SSL certificate for client authentication.
*
- * The {@link #sslCert} and {@link #sslKey} must be both non-{@code null} or both {@code null}.
+ * It and {@link #sslKey} must be both non-{@code null} or both {@code null}.
+ *
+ * Note: It is used only if the {@link #unixSocket(String)} is not configured.
*
* @param sslCert an X.509 certificate chain file in PEM format, or {@code null} if no SSL cert.
- * @return this {@link Builder}.
+ * @return {@link Builder this}
* @since 0.8.2
*/
public Builder sslCert(@Nullable String sslCert) {
- this.sslCert = sslCert;
+ this.ssl.sslCert(sslCert);
return this;
}
/**
* Configures client SSL key for client authentication.
*
- * The {@link #sslCert} and {@link #sslKey} must be both non-{@code null} or both {@code null}.
+ * It and {@link #sslCert} must be both non-{@code null} or both {@code null}.
+ *
+ * Note: It is used only if the {@link #unixSocket(String)} is not configured.
*
* @param sslKey a PKCS#8 private key file in PEM format, or {@code null} if no SSL key.
- * @return this {@link Builder}.
+ * @return {@link Builder this}
* @since 0.8.2
*/
public Builder sslKey(@Nullable String sslKey) {
- this.sslKey = sslKey;
+ this.ssl.sslKey(sslKey);
return this;
}
@@ -825,39 +926,42 @@ public Builder sslKey(@Nullable String sslKey) {
* Configures the password of SSL key file for client certificate authentication.
*
* It will be used only if {@link #sslKey} and {@link #sslCert} non-null.
+ *
+ * Note: It is used only if the {@link #unixSocket(String)} is not configured.
*
- * @param sslKeyPassword the password of the {@link #sslKey}, or {@code null} if it's not
- * password-protected.
- * @return this {@link Builder}.
+ * @param sslKeyPassword the password of the {@link #sslKey}, or {@code null} if it's not password-protected.
+ * @return {@link Builder this}
* @since 0.8.2
*/
public Builder sslKeyPassword(@Nullable CharSequence sslKeyPassword) {
- this.sslKeyPassword = sslKeyPassword;
+ this.ssl.sslKeyPassword(sslKeyPassword);
return this;
}
/**
- * Configures a {@link SslContextBuilder} customizer. The customizer gets applied on each SSL
- * connection attempt to allow for just-in-time configuration updates. The {@link Function} gets
- * called with the prepared {@link SslContextBuilder} that has all configuration options applied. The
- * customizer may return the same builder or return a new builder instance to be used to build the SSL
- * context.
+ * Configures a {@link SslContextBuilder} customizer. The customizer gets applied on each SSL connection attempt
+ * to allow for just-in-time configuration updates. The {@link Function} gets called with the prepared
+ * {@link SslContextBuilder} that has all configuration options applied. The customizer may return the same
+ * builder or return a new builder instance to be used to build the SSL context.
+ *
+ * Note: It is used only if the {@link #unixSocket(String)} is not configured.
*
* @param customizer customizer function
* @return this {@link Builder}
* @throws IllegalArgumentException if {@code customizer} is {@code null}
* @since 0.8.1
*/
- public Builder sslContextBuilderCustomizer(
- Function customizer) {
+ public Builder sslContextBuilderCustomizer(Function customizer) {
requireNonNull(customizer, "sslContextBuilderCustomizer must not be null");
- this.sslContextBuilderCustomizer = customizer;
+ this.ssl.sslContextBuilderCustomizer(customizer);
return this;
}
/**
* Configures TCP KeepAlive.
+ *
+ * Note: It is used only if the {@link #unixSocket(String)} is not configured.
*
* @param enabled whether to enable TCP KeepAlive
* @return this {@link Builder}
@@ -865,12 +969,14 @@ public Builder sslContextBuilderCustomizer(
* @since 0.8.2
*/
public Builder tcpKeepAlive(boolean enabled) {
- this.tcpKeepAlive = enabled;
+ requireTcpSocket().tcpKeepAlive(enabled);
return this;
}
/**
* Configures TCP NoDelay.
+ *
+ * Note: It is used only if the {@link #unixSocket(String)} is not configured.
*
* @param enabled whether to enable TCP NoDelay
* @return this {@link Builder}
@@ -878,15 +984,14 @@ public Builder tcpKeepAlive(boolean enabled) {
* @since 0.8.2
*/
public Builder tcpNoDelay(boolean enabled) {
- this.tcpNoDelay = enabled;
+ requireTcpSocket().tcpNoDelay(enabled);
return this;
}
/**
* Configures the protocol of parameterized statements to the text protocol.
*
- * The text protocol is default protocol that's using client-preparing. See also MySQL
- * documentations.
+ * The text protocol is default protocol that's using client-preparing. See also MySQL documentations.
*
* @return this {@link Builder}
* @since 0.8.1
@@ -899,10 +1004,9 @@ public Builder useClientPrepareStatement() {
/**
* Configures the protocol of parameterized statements to the binary protocol.
*
- * The binary protocol is compact protocol that's using server-preparing. See also MySQL
- * documentations.
+ * The binary protocol is compact protocol that's using server-preparing. See also MySQL documentations.
*
- * @return this {@link Builder}.
+ * @return {@link Builder this}
* @since 0.8.1
*/
public Builder useServerPrepareStatement() {
@@ -910,19 +1014,18 @@ public Builder useServerPrepareStatement() {
}
/**
- * Configures the protocol of parameterized statements and prepare-preferred simple statements to the
- * binary protocol.
+ * Configures the protocol of parameterized statements and prepare-preferred simple statements to the binary
+ * protocol.
*
- * The {@code preferPrepareStatement} configures whether to prefer prepare execution on a
- * statement-by-statement basis (simple statements). The {@link Predicate} accepts the simple SQL
- * query string and returns a boolean flag indicating preference. {@code true} prepare-preferred,
- * {@code false} prefers direct execution (text protocol). Defaults to direct execution.
+ * The {@code preferPrepareStatement} configures whether to prefer prepare execution on a statement-by-statement
+ * basis (simple statements). The {@link Predicate} accepts the simple SQL query string and returns a boolean
+ * flag indicating preference. {@code true} prepare-preferred, {@code false} prefers direct execution (text
+ * protocol). Defaults to direct execution.
*
- * The binary protocol is compact protocol that's using server-preparing. See also MySQL
- * documentations.
+ * The binary protocol is compact protocol that's using server-preparing. See also MySQL documentations.
*
* @param preferPrepareStatement the above {@link Predicate}.
- * @return this {@link Builder}.
+ * @return {@link Builder this}
* @throws IllegalArgumentException if {@code preferPrepareStatement} is {@code null}.
* @since 0.8.1
*/
@@ -934,8 +1037,8 @@ public Builder useServerPrepareStatement(Predicate preferPrepareStatemen
}
/**
- * Configures the session variables, used to set session variables immediately after login. Default no
- * session variables to set. It should be a list of key-value pairs. e.g.
+ * Configures the session variables, used to set session variables immediately after login. Default no session
+ * variables to set. It should be a list of key-value pairs. e.g.
* {@code ["sql_mode='ANSI_QUOTES,STRICT_TRANS_TABLES'", "time_zone=00:00"]}.
*
* @param sessionVariables the session variables to set.
@@ -951,7 +1054,7 @@ public Builder sessionVariables(String... sessionVariables) {
}
/**
- * Configures the lock wait timeout. Default to use the server-side default value.
+ * <<<<<<< HEAD Configures the lock wait timeout. Default to use the server-side default value.
*
* @param lockWaitTimeout the lock wait timeout, or {@code null} to use the server-side default value.
* @return {@link Builder this}
@@ -975,8 +1078,8 @@ public Builder statementTimeout(@Nullable Duration statementTimeout) {
}
/**
- * Configures to allow the {@code LOAD DATA LOCAL INFILE} statement in the given {@code path} or
- * disallow the statement. Default to {@code null} which means not allow the statement.
+ * Configures to allow the {@code LOAD DATA LOCAL INFILE} statement in the given {@code path} or disallow the
+ * statement. Default to {@code null} which means not allow the statement.
*
* @param path which parent path are allowed to load file data, {@code null} means not be allowed.
* @return {@link Builder this}.
@@ -1007,14 +1110,14 @@ public Builder localInfileBufferSize(int localInfileBufferSize) {
}
/**
- * Configures the maximum size of the {@link Query} parsing cache. Usually it should be power of two.
- * Default to {@code 0}. Driver will use unbounded cache if size is less than {@code 0}.
+ * Configures the maximum size of the {@link Query} parsing cache. Usually it should be power of two. Default to
+ * {@code 0}. Driver will use unbounded cache if size is less than {@code 0}.
*
- * Notice: the cache is using EL model (the PACELC theorem) which provider better performance. That
- * means it is an elastic cache. So this size is not a hard-limit. It should be over 16 in average.
+ * Notice: the cache is using EL model (the PACELC theorem) which provider better performance. That means it is
+ * an elastic cache. So this size is not a hard-limit. It should be over 16 in average.
*
* @param queryCacheSize the above size, {@code 0} means no cache, {@code -1} means unbounded cache.
- * @return this {@link Builder}.
+ * @return {@link Builder this}
* @since 0.8.3
*/
public Builder queryCacheSize(int queryCacheSize) {
@@ -1023,19 +1126,17 @@ public Builder queryCacheSize(int queryCacheSize) {
}
/**
- * Configures the maximum size of the server-preparing cache. Usually it should be power of two.
- * Default to {@code 256}. Driver will use unbounded cache if size is less than {@code 0}. It is used
- * only if using server-preparing parameterized statements, i.e. the {@link #useServerPrepareStatement}
- * is set.
+ * Configures the maximum size of the server-preparing cache. Usually it should be power of two. Default to
+ * {@code 256}. Driver will use unbounded cache if size is less than {@code 0}. It is used only if using
+ * server-preparing parameterized statements, i.e. the {@link #useServerPrepareStatement} is set.
*
- * Notice: the cache is using EC model (the PACELC theorem) for ensure consistency. Consistency is
- * very important because MySQL contains a hard limit of all server-prepared statements which has been
- * opened, see also {@code max_prepared_stmt_count}. And, the cache is one-to-one connection, which
- * means it will not work on thread-concurrency.
+ * Notice: the cache is using EC model (the PACELC theorem) for ensure consistency. Consistency is very
+ * important because MySQL contains a hard limit of all server-prepared statements which has been opened, see
+ * also {@code max_prepared_stmt_count}. And, the cache is one-to-one connection, which means it will not work
+ * on thread-concurrency.
*
- * @param prepareCacheSize the above size, {@code 0} means no cache, {@code -1} means unbounded
- * cache.
- * @return this {@link Builder}.
+ * @param prepareCacheSize the above size, {@code 0} means no cache, {@code -1} means unbounded cache.
+ * @return {@link Builder this}
* @since 0.8.3
*/
public Builder prepareCacheSize(int prepareCacheSize) {
@@ -1046,10 +1147,9 @@ public Builder prepareCacheSize(int prepareCacheSize) {
/**
* Configures the compression algorithms. Default to [{@link CompressionAlgorithm#UNCOMPRESSED}].
*
- * It will auto choose an algorithm that's contained in the list and supported by the server,
- * preferring zstd, then zlib. If the list does not contain {@link CompressionAlgorithm#UNCOMPRESSED}
- * and the server does not support any algorithm in the list, an exception will be thrown when
- * connecting.
+ * It will auto choose an algorithm that's contained in the list and supported by the server, preferring zstd,
+ * then zlib. If the list does not contain {@link CompressionAlgorithm#UNCOMPRESSED} and the server does not
+ * support any algorithm in the list, an exception will be thrown when connecting.
*
* Note: zstd requires a dependency {@code com.github.luben:zstd-jni}.
*
@@ -1106,12 +1206,12 @@ public Builder zstdCompressionLevel(int level) {
* {@link TcpResources#get() global tcp resources}.
*
* @param loopResources the {@link LoopResources}.
- * @return this {@link Builder}.
+ * @return {@link Builder this}
* @throws IllegalArgumentException if {@code loopResources} is {@code null}.
* @since 1.1.2
*/
public Builder loopResources(LoopResources loopResources) {
- this.loopResources = requireNonNull(loopResources, "loopResources must not be null");
+ this.client.loopResources(loopResources);
return this;
}
@@ -1120,7 +1220,7 @@ public Builder loopResources(LoopResources loopResources) {
* {@code true}.
*
* @param enabled to discover and register extensions.
- * @return this {@link Builder}.
+ * @return {@link Builder this}
* @since 0.8.2
*/
public Builder autodetectExtensions(boolean enabled) {
@@ -1131,12 +1231,12 @@ public Builder autodetectExtensions(boolean enabled) {
/**
* Registers a {@link Extension} to extend driver functionality and manually.
*
- * Notice: the driver will not deduplicate {@link Extension}s of autodetect discovered and manually
- * extended. So if a {@link Extension} is registered by this function and autodetect discovered, it
- * will get two {@link Extension} as same.
+ * Notice: the driver will not deduplicate {@link Extension}s of autodetect discovered and manually extended. So
+ * if a {@link Extension} is registered by this function and autodetect discovered, it will get two
+ * {@link Extension} as same.
*
* @param extension extension to extend driver functionality.
- * @return this {@link Builder}.
+ * @return {@link Builder this}
* @throws IllegalArgumentException if {@code extension} is {@code null}.
* @since 0.8.2
*/
@@ -1146,26 +1246,36 @@ public Builder extendWith(Extension extension) {
}
/**
- * Registers a password publisher function.
+ * Registers a password publisher function. Since 1.2.0, it is an alias of {@link #password(Publisher)}.
*
- * @param passwordPublisher function to retrieve password before making connection.
- * @return this {@link Builder}.
+ * @param password a {@link Publisher} to retrieve password before making connection.
+ * @return {@link Builder this}
*/
- public Builder passwordPublisher(Publisher passwordPublisher) {
- this.passwordPublisher = passwordPublisher;
- return this;
+ public Builder passwordPublisher(Publisher extends CharSequence> password) {
+ return password(password);
}
- private SslMode requireSslMode() {
- SslMode sslMode = this.sslMode;
+ private TcpSocketConfiguration.Builder requireTcpSocket() {
+ TcpSocketConfiguration.Builder tcpSocket = this.tcpSocket;
- if (sslMode == null) {
- sslMode = isHost ? SslMode.PREFERRED : SslMode.DISABLED;
+ if (tcpSocket == null) {
+ this.tcpSocket = tcpSocket = new TcpSocketConfiguration.Builder();
}
- return sslMode;
+ return tcpSocket;
}
- private Builder() { }
+ private UnixDomainSocketConfiguration.Builder requireUnixSocket() {
+ UnixDomainSocketConfiguration.Builder unixSocket = this.unixSocket;
+
+ if (unixSocket == null) {
+ this.unixSocket = unixSocket = new UnixDomainSocketConfiguration.Builder();
+ }
+
+ return unixSocket;
+ }
+
+ private Builder() {
+ }
}
}
diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactory.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactory.java
index d003db2b0..478ff7794 100644
--- a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactory.java
+++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactory.java
@@ -19,19 +19,11 @@
import io.asyncer.r2dbc.mysql.api.MySqlConnection;
import io.asyncer.r2dbc.mysql.cache.Caches;
import io.asyncer.r2dbc.mysql.cache.QueryCache;
-import io.asyncer.r2dbc.mysql.client.Client;
-import io.asyncer.r2dbc.mysql.internal.util.StringUtils;
-import io.netty.channel.unix.DomainSocketAddress;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryMetadata;
import org.jetbrains.annotations.Nullable;
-import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.time.ZoneId;
-import java.util.Objects;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
@@ -69,127 +61,28 @@ public static MySqlConnectionFactory from(MySqlConnectionConfiguration configura
LazyQueryCache queryCache = new LazyQueryCache(configuration.getQueryCacheSize());
- return new MySqlConnectionFactory(Mono.defer(() -> {
- MySqlSslConfiguration ssl;
- SocketAddress address;
-
- if (configuration.isHost()) {
- ssl = configuration.getSsl();
- address = InetSocketAddress.createUnresolved(configuration.getDomain(),
- configuration.getPort());
- } else {
- ssl = MySqlSslConfiguration.disabled();
- address = new DomainSocketAddress(configuration.getDomain());
- }
-
- String user = configuration.getUser();
- CharSequence password = configuration.getPassword();
- Publisher passwordPublisher = configuration.getPasswordPublisher();
-
- if (Objects.nonNull(passwordPublisher)) {
- return Mono.from(passwordPublisher).flatMap(token -> getMySqlConnection(
- configuration, ssl,
- queryCache,
- address,
- user,
- token
- ));
- }
-
- return getMySqlConnection(
- configuration, ssl,
- queryCache,
- address,
- user,
- password
- );
- }));
- }
-
- /**
- * Gets an initialized {@link MySqlConnection} from authentication credential and configurations.
- *
- * It contains following steps:
- *
Create connection context
- *
Connect to MySQL server with TCP or Unix Domain Socket
- *
Handshake/login and init handshake states
- *
Init session states
- *
- * @param configuration the connection configuration.
- * @param ssl the SSL configuration.
- * @param queryCache lazy-init query cache, it is shared among all connections from the same factory.
- * @param address TCP or Unix Domain Socket address.
- * @param user the user of the authentication.
- * @param password the password of the authentication.
- * @return a {@link MySqlConnection}.
- */
- private static Mono getMySqlConnection(
- final MySqlConnectionConfiguration configuration,
- final MySqlSslConfiguration ssl,
- final LazyQueryCache queryCache,
- final SocketAddress address,
- final String user,
- @Nullable final CharSequence password
- ) {
- return Mono.fromSupplier(() -> {
- ZoneId connectionTimeZone = retrieveZoneId(configuration.getConnectionTimeZone());
- return new ConnectionContext(
- configuration.getZeroDateOption(),
- configuration.getLoadLocalInfilePath(),
- configuration.getLocalInfileBufferSize(),
- configuration.isPreserveInstants(),
- connectionTimeZone
- );
- }).flatMap(context -> Client.connect(
- ssl,
- address,
- configuration.isTcpKeepAlive(),
- configuration.isTcpNoDelay(),
- context,
- configuration.getConnectTimeout(),
- configuration.getLoopResources()
- )).flatMap(client -> {
- // Lazy init database after handshake/login
- boolean deferDatabase = configuration.isCreateDatabaseIfNotExist();
- String database = configuration.getDatabase();
- String loginDb = deferDatabase ? "" : database;
- String sessionDb = deferDatabase ? database : "";
-
- return InitFlow.initHandshake(
- client,
- ssl.getSslMode(),
- loginDb,
- user,
- password,
- configuration.getCompressionAlgorithms(),
- configuration.getZstdCompressionLevel()
- ).then(InitFlow.initSession(
- client,
- sessionDb,
- configuration.getPrepareCacheSize(),
- configuration.getSessionVariables(),
- configuration.isForceConnectionTimeZoneToSession(),
- configuration.getLockWaitTimeout(),
- configuration.getStatementTimeout(),
- configuration.getExtensions()
- )).map(codecs -> new MySqlSimpleConnection(
- client,
- codecs,
- queryCache.get(),
- configuration.getPreferPrepareStatement()
- )).onErrorResume(e -> client.forceClose().then(Mono.error(e)));
- });
- }
-
- @Nullable
- private static ZoneId retrieveZoneId(String timeZone) {
- if ("LOCAL".equalsIgnoreCase(timeZone)) {
- return ZoneId.systemDefault().normalized();
- } else if ("SERVER".equalsIgnoreCase(timeZone)) {
- return null;
- }
-
- return StringUtils.parseZoneId(timeZone);
+ return new MySqlConnectionFactory(Mono.defer(() -> configuration.getSocket()
+ .strategy(configuration)
+ .connect()
+ .flatMap(client -> {
+ String sessionDb = configuration.isCreateDatabaseIfNotExist() ? configuration.getDatabase() : "";
+
+ return InitFlow.initSession(
+ client,
+ sessionDb,
+ configuration.getPrepareCacheSize(),
+ configuration.getSessionVariables(),
+ configuration.isForceConnectionTimeZoneToSession(),
+ configuration.getLockWaitTimeout(),
+ configuration.getStatementTimeout(),
+ configuration.getExtensions()
+ ).map(codecs -> new MySqlSimpleConnection(
+ client,
+ codecs,
+ queryCache.get(),
+ configuration.getPreferPrepareStatement()
+ )).onErrorResume(e -> client.close().then(Mono.error(e)));
+ })));
}
private static final class LazyQueryCache implements Supplier {
diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProvider.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProvider.java
index f6dc1a57a..ad71be06d 100644
--- a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProvider.java
+++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProvider.java
@@ -17,8 +17,12 @@
package io.asyncer.r2dbc.mysql;
import io.asyncer.r2dbc.mysql.constant.CompressionAlgorithm;
+import io.asyncer.r2dbc.mysql.constant.ProtocolDriver;
+import io.asyncer.r2dbc.mysql.constant.HaProtocol;
import io.asyncer.r2dbc.mysql.constant.SslMode;
import io.asyncer.r2dbc.mysql.constant.ZeroDateOption;
+import io.asyncer.r2dbc.mysql.internal.NodeAddress;
+import io.asyncer.r2dbc.mysql.internal.util.AddressUtils;
import io.netty.handler.ssl.SslContextBuilder;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryOptions;
@@ -45,6 +49,7 @@
import static io.r2dbc.spi.ConnectionFactoryOptions.LOCK_WAIT_TIMEOUT;
import static io.r2dbc.spi.ConnectionFactoryOptions.PASSWORD;
import static io.r2dbc.spi.ConnectionFactoryOptions.PORT;
+import static io.r2dbc.spi.ConnectionFactoryOptions.PROTOCOL;
import static io.r2dbc.spi.ConnectionFactoryOptions.SSL;
import static io.r2dbc.spi.ConnectionFactoryOptions.STATEMENT_TIMEOUT;
import static io.r2dbc.spi.ConnectionFactoryOptions.USER;
@@ -54,11 +59,6 @@
*/
public final class MySqlConnectionFactoryProvider implements ConnectionFactoryProvider {
- /**
- * The name of the driver used for discovery, should not be changed.
- */
- public static final String MYSQL_DRIVER = "mysql";
-
/**
* Option to set the Unix Domain Socket.
*
@@ -67,8 +67,20 @@ public final class MySqlConnectionFactoryProvider implements ConnectionFactoryPr
public static final Option UNIX_SOCKET = Option.valueOf("unixSocket");
/**
- * Option to set the time zone conversion. Default to {@code true} means enable conversion between JVM
- * and {@link #CONNECTION_TIME_ZONE}.
+ * Option to whether to perform failover reconnection. Default to {@code false}.
+ *
+ * It is not recommended due to it may lead to unexpected results. For example, it may recover a transaction state
+ * from a failed server node to an available node, the user can not aware of it, and continuing to execute more
+ * queries in the transaction will lead to unexpected inconsistencies or errors. Or, user set a self-defined
+ * variable in the session, it may not be recovered to the new node due to the driver can not aware of it.
+ *
+ * @since 1.2.0
+ */
+ public static final Option AUTO_RECONNECT = Option.valueOf("autoReconnect");
+
+ /**
+ * Option to set the time zone conversion. Default to {@code true} means enable conversion between JVM and
+ * {@link #CONNECTION_TIME_ZONE}.
*
* Note: disable it will ignore the time zone of connection, and use the JVM local time zone.
*
@@ -77,9 +89,9 @@ public final class MySqlConnectionFactoryProvider implements ConnectionFactoryPr
public static final Option PRESERVE_INSTANTS = Option.valueOf("preserveInstants");
/**
- * Option to set the time zone of connection. Default to {@code LOCAL} means use JVM local time zone.
- * It should be {@code "LOCAL"}, {@code "SERVER"}, or a valid ID of {@code ZoneId}. {@code "SERVER"} means
- * querying the server-side timezone during initialization.
+ * Option to set the time zone of connection. Default to {@code LOCAL} means use JVM local time zone. It should be
+ * {@code "LOCAL"}, {@code "SERVER"}, or a valid ID of {@code ZoneId}. {@code "SERVER"} means querying the
+ * server-side timezone during initialization.
*
* @since 1.1.2
*/
@@ -88,8 +100,8 @@ public final class MySqlConnectionFactoryProvider implements ConnectionFactoryPr
/**
* Option to force the time zone of connection to session time zone. Default to {@code false}.
*
- * Note: alter the time zone of session will affect the results of MySQL date/time functions, e.g.
- * {@code NOW([n])}, {@code CURRENT_TIME([n])}, {@code CURRENT_DATE()}, etc. Please use with caution.
+ * Note: alter the time zone of session will affect the results of MySQL date/time functions, e.g. {@code NOW([n])},
+ * {@code CURRENT_TIME([n])}, {@code CURRENT_DATE()}, etc. Please use with caution.
*
* @since 1.1.2
*/
@@ -97,8 +109,7 @@ public final class MySqlConnectionFactoryProvider implements ConnectionFactoryPr
Option.valueOf("forceConnectionTimeZoneToSession");
/**
- * Option to set {@link ZoneId} of server. If it is set, driver will ignore the real time zone of
- * server-side.
+ * Option to set {@link ZoneId} of server. If it is set, driver will ignore the real time zone of server-side.
*
* @since 0.8.2
* @deprecated since 1.1.2, use {@link #CONNECTION_TIME_ZONE} instead.
@@ -122,8 +133,8 @@ public final class MySqlConnectionFactoryProvider implements ConnectionFactoryPr
/**
* Option to configure {@link HostnameVerifier}. It is available only if the {@link #SSL_MODE} set to
- * {@link SslMode#VERIFY_IDENTITY}. It can be an implementation class name of {@link HostnameVerifier}
- * with a public no-args constructor.
+ * {@link SslMode#VERIFY_IDENTITY}. It can be an implementation class name of {@link HostnameVerifier} with a public
+ * no-args constructor.
*
* @since 0.8.2
*/
@@ -131,17 +142,17 @@ public final class MySqlConnectionFactoryProvider implements ConnectionFactoryPr
Option.valueOf("sslHostnameVerifier");
/**
- * Option to TLS versions for SslContext protocols, see also {@code TlsVersions}. Usually sorted from
- * higher to lower. It can be a {@code Collection}. It can be a {@link String}, protocols will be
- * split by {@code ,}. e.g. "TLSv1.2,TLSv1.1,TLSv1".
+ * Option to TLS versions for SslContext protocols, see also {@code TlsVersions}. Usually sorted from higher to
+ * lower. It can be a {@code Collection}. It can be a {@link String}, protocols will be split by {@code ,}.
+ * e.g. "TLSv1.2,TLSv1.1,TLSv1".
*
* @since 0.8.1
*/
public static final Option TLS_VERSION = Option.valueOf("tlsVersion");
/**
- * Option to set a PEM file of server SSL CA. It will be used to verify server certificates. And it will
- * be used only if {@link #SSL_MODE} set to {@link SslMode#VERIFY_CA} or higher level.
+ * Option to set a PEM file of server SSL CA. It will be used to verify server certificates. And it will be used
+ * only if {@link #SSL_MODE} set to {@link SslMode#VERIFY_CA} or higher level.
*
* @since 0.8.1
*/
@@ -170,8 +181,8 @@ public final class MySqlConnectionFactoryProvider implements ConnectionFactoryPr
public static final Option SSL_CERT = Option.valueOf("sslCert");
/**
- * Option to custom {@link SslContextBuilder}. It can be an implementation class name of {@link Function}
- * with a public no-args constructor.
+ * Option to custom {@link SslContextBuilder}. It can be an implementation class name of {@link Function} with a
+ * public no-args constructor.
*
* @since 0.8.2
*/
@@ -203,18 +214,17 @@ public final class MySqlConnectionFactoryProvider implements ConnectionFactoryPr
/**
* Enable server preparing for parameterized statements and prefer server preparing simple statements.
*
- * The value can be a {@link Boolean}. If it is {@code true}, driver will use server preparing for
- * parameterized statements and text query for simple statements. If it is {@code false}, driver will use
- * client preparing for parameterized statements and text query for simple statements.
+ * The value can be a {@link Boolean}. If it is {@code true}, driver will use server preparing for parameterized
+ * statements and text query for simple statements. If it is {@code false}, driver will use client preparing for
+ * parameterized statements and text query for simple statements.
*
- * The value can be a {@link Predicate}{@code <}{@link String}{@code >}. If it is set, driver will server
- * preparing for parameterized statements, it configures whether to prefer prepare execution on a
- * statement-by-statement basis (simple statements). The {@link Predicate}{@code <}{@link String}{@code >}
- * accepts the simple SQL query string and returns a {@code boolean} flag indicating preference.
+ * The value can be a {@link Predicate}{@code <}{@link String}{@code >}. If it is set, driver will server preparing
+ * for parameterized statements, it configures whether to prefer prepare execution on a statement-by-statement basis
+ * (simple statements). The {@link Predicate}{@code <}{@link String}{@code >} accepts the simple SQL query string
+ * and returns a {@code boolean} flag indicating preference.
*
- * The value can be a {@link String}. If it is set, driver will try to convert it to {@link Boolean} or an
- * instance of {@link Predicate}{@code <}{@link String}{@code >} which use reflection with a public
- * no-args constructor.
+ * The value can be a {@link String}. If it is set, driver will try to convert it to {@link Boolean} or an instance
+ * of {@link Predicate}{@code <}{@link String}{@code >} which use reflection with a public no-args constructor.
*
* @since 0.8.1
*/
@@ -248,9 +258,9 @@ public final class MySqlConnectionFactoryProvider implements ConnectionFactoryPr
/**
* Option to set compression algorithms. Default to [{@link CompressionAlgorithm#UNCOMPRESSED}].
*
- * It will auto choose an algorithm that's contained in the list and supported by the server, preferring
- * zstd, then zlib. If the list does not contain {@link CompressionAlgorithm#UNCOMPRESSED} and the server
- * does not support any algorithm in the list, an exception will be thrown when connecting.
+ * It will auto choose an algorithm that's contained in the list and supported by the server, preferring zstd, then
+ * zlib. If the list does not contain {@link CompressionAlgorithm#UNCOMPRESSED} and the server does not support any
+ * algorithm in the list, an exception will be thrown when connecting.
*
* Note: zstd requires a dependency {@code com.github.luben:zstd-jni}.
*
@@ -264,8 +274,7 @@ public final class MySqlConnectionFactoryProvider implements ConnectionFactoryPr
*
* It is only used if zstd is chosen for the connection.
*
- * Note: MySQL protocol does not allow to set the zlib compression level of the server, only zstd is
- * configurable.
+ * Note: MySQL protocol does not allow to set the zlib compression level of the server, only zstd is configurable.
*
* @since 1.1.2
*/
@@ -302,9 +311,9 @@ public final class MySqlConnectionFactoryProvider implements ConnectionFactoryPr
public static final Option AUTODETECT_EXTENSIONS = Option.valueOf("autodetectExtensions");
/**
- * Password Publisher function can be used to retrieve password before creating a connection. This can be
- * used with Amazon RDS Aurora IAM authentication, wherein it requires token to be generated. The token is
- * valid for 15 minutes, and this token will be used as password.
+ * Password Publisher function can be used to retrieve password before creating a connection. This can be used with
+ * Amazon RDS Aurora IAM authentication, wherein it requires token to be generated. The token is valid for 15
+ * minutes, and this token will be used as password.
*/
public static final Option> PASSWORD_PUBLISHER = Option.valueOf("passwordPublisher");
@@ -318,12 +327,14 @@ public ConnectionFactory create(ConnectionFactoryOptions options) {
@Override
public boolean supports(ConnectionFactoryOptions options) {
requireNonNull(options, "connectionFactoryOptions must not be null");
- return MYSQL_DRIVER.equals(options.getValue(DRIVER));
+
+ Object driver = options.getValue(DRIVER);
+ return driver instanceof String && ProtocolDriver.supports((String) driver);
}
@Override
public String getDriver() {
- return MYSQL_DRIVER;
+ return ProtocolDriver.standardDriver();
}
/**
@@ -340,18 +351,30 @@ static MySqlConnectionConfiguration setup(ConnectionFactoryOptions options) {
.to(builder::user);
mapper.optional(PASSWORD).asPassword()
.to(builder::password);
- mapper.optional(UNIX_SOCKET).asString()
- .to(builder::unixSocket)
- .otherwise(() -> setupHost(builder, mapper));
+
+ boolean unixSocket = mapper.optional(UNIX_SOCKET).asString()
+ .to(builder::unixSocket);
+
+ if (!unixSocket) {
+ setupHost(builder, mapper);
+ }
+
mapper.optional(PRESERVE_INSTANTS).asBoolean()
.to(builder::preserveInstants);
- mapper.optional(CONNECTION_TIME_ZONE).asString()
- .to(builder::connectionTimeZone)
- .otherwise(() -> mapper.optional(SERVER_ZONE_ID)
+
+ boolean connectionTimeZone = mapper.optional(CONNECTION_TIME_ZONE).asString()
+ .to(builder::connectionTimeZone);
+
+ if (!connectionTimeZone) {
+ mapper.optional(SERVER_ZONE_ID)
.as(ZoneId.class, id -> ZoneId.of(id, ZoneId.SHORT_IDS))
- .to(builder::serverZoneId));
+ .to(builder::serverZoneId);
+ }
+
mapper.optional(FORCE_CONNECTION_TIME_ZONE_TO_SESSION).asBoolean()
.to(builder::forceConnectionTimeZoneToSession);
+ mapper.optional(AUTO_RECONNECT).asBoolean()
+ .to(builder::autoReconnect);
mapper.optional(TCP_KEEP_ALIVE).asBoolean()
.to(builder::tcpKeepAlive);
mapper.optional(TCP_NO_DELAY).asBoolean()
@@ -388,7 +411,7 @@ static MySqlConnectionConfiguration setup(ConnectionFactoryOptions options) {
mapper.optional(LOOP_RESOURCES).as(LoopResources.class)
.to(builder::loopResources);
mapper.optional(PASSWORD_PUBLISHER).as(Publisher.class)
- .to(builder::passwordPublisher);
+ .to(builder::password);
mapper.optional(SESSION_VARIABLES).asArray(
String[].class,
Function.identity(),
@@ -404,17 +427,44 @@ static MySqlConnectionConfiguration setup(ConnectionFactoryOptions options) {
}
/**
- * Set builder of {@link MySqlConnectionConfiguration} for hostname-based address with SSL
- * configurations.
+ * Set builder of {@link MySqlConnectionConfiguration} for hostname-based path with SSL configurations.
*
* @param builder the builder of {@link MySqlConnectionConfiguration}.
* @param mapper the {@link OptionMapper} of {@code options}.
*/
private static void setupHost(MySqlConnectionConfiguration.Builder builder, OptionMapper mapper) {
- mapper.requires(HOST).asString()
- .to(builder::host);
- mapper.optional(PORT).asInt()
+ boolean port = mapper.optional(PORT).asInt()
.to(builder::port);
+
+ if (port) {
+ // If port is set, host must be a single host.
+ mapper.requires(HOST).asString()
+ .to(builder::host);
+ } else {
+ // If port is not set, host can be a single host or multiple hosts.
+ // If the URI contains an underscore in the host, it will produce an incorrectly resolved host and port.
+ // e.g. "r2dbc:mysql://my_db:3306" will be resolved to "my_db:3306" as host and null as port.
+ // See https://github.com/asyncer-io/r2dbc-mysql/issues/255
+ mapper.requires(HOST)
+ .asArray(String[].class, Function.identity(), it -> it.split(","), String[]::new)
+ .to(hosts -> {
+ if (hosts.length == 1) {
+ builder.host(hosts[0]);
+ return;
+ }
+
+ for (String host : hosts) {
+ NodeAddress address = AddressUtils.parseAddress(host);
+
+ builder.addHost(address.getHost(), address.getPort());
+ }
+ });
+ }
+
+ mapper.requires(DRIVER).as(ProtocolDriver.class, ProtocolDriver::from)
+ .to(builder::driver);
+ mapper.optional(PROTOCOL).as(HaProtocol.class, HaProtocol::from)
+ .to(builder::protocol);
mapper.optional(SSL).asBoolean()
.to(isSsl -> builder.sslMode(isSsl ? SslMode.REQUIRED : SslMode.DISABLED));
mapper.optional(SSL_MODE).as(SslMode.class, id -> SslMode.valueOf(id.toUpperCase()))
@@ -437,12 +487,12 @@ private static void setupHost(MySqlConnectionConfiguration.Builder builder, Opti
}
/**
- * Splits session variables from user input. e.g. {@code sql_mode='ANSI_QUOTE,STRICT',c=d;e=f} will be
- * split into {@code ["sql_mode='ANSI_QUOTE,STRICT'", "c=d", "e=f"]}.
+ * Splits session variables from user input. e.g. {@code sql_mode='ANSI_QUOTE,STRICT',c=d;e=f} will be split into
+ * {@code ["sql_mode='ANSI_QUOTE,STRICT'", "c=d", "e=f"]}.
*
- * It supports escaping characters with backslash, quoted values with single or double quotes, and nested
- * brackets. Priorities are: backslash in quoted > single quote = double quote > bracket, backslash
- * will not be a valid escape character if it is not in a quoted value.
+ * It supports escaping characters with backslash, quoted values with single or double quotes, and nested brackets.
+ * Priorities are: backslash in quoted > single quote = double quote > bracket, backslash will not be a valid
+ * escape character if it is not in a quoted value.
*
* Note that it does not strictly check syntax validity, so it will not throw syntax exceptions.
*
diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlSslConfiguration.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlSslConfiguration.java
index d76662f40..00dd10cfb 100644
--- a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlSslConfiguration.java
+++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlSslConfiguration.java
@@ -17,6 +17,7 @@
package io.asyncer.r2dbc.mysql;
import io.asyncer.r2dbc.mysql.constant.SslMode;
+import io.asyncer.r2dbc.mysql.internal.util.InternalArrays;
import io.netty.handler.ssl.SslContextBuilder;
import org.jetbrains.annotations.Nullable;
@@ -25,7 +26,7 @@
import java.util.Objects;
import java.util.function.Function;
-import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.requireNonNull;
+import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.require;
import static io.asyncer.r2dbc.mysql.internal.util.InternalArrays.EMPTY_STRINGS;
/**
@@ -106,8 +107,8 @@ public String getSslCert() {
}
/**
- * Customizes a {@link SslContextBuilder} that customizer was specified by configuration, or do nothing if
- * the customizer was not set.
+ * Customizes a {@link SslContextBuilder} that customizer was specified by configuration, or do nothing if the
+ * customizer was not set.
*
* @param builder the {@link SslContextBuilder}.
* @return the {@code builder}.
@@ -162,19 +163,87 @@ static MySqlSslConfiguration disabled() {
return DISABLED;
}
- static MySqlSslConfiguration create(SslMode sslMode, String[] tlsVersion,
- @Nullable HostnameVerifier sslHostnameVerifier, @Nullable String sslCa, @Nullable String sslKey,
- @Nullable CharSequence sslKeyPassword, @Nullable String sslCert,
- @Nullable Function sslContextBuilderCustomizer) {
- requireNonNull(sslMode, "sslMode must not be null");
+ static final class Builder {
- if (sslMode == SslMode.DISABLED) {
- return DISABLED;
+ @Nullable
+ private SslMode sslMode;
+
+ private String[] tlsVersions = InternalArrays.EMPTY_STRINGS;
+
+ @Nullable
+ private HostnameVerifier sslHostnameVerifier;
+
+ @Nullable
+ private String sslCa;
+
+ @Nullable
+ private String sslKey;
+
+ @Nullable
+ private CharSequence sslKeyPassword;
+
+ @Nullable
+ private String sslCert;
+
+ @Nullable
+ private Function sslContextBuilderCustomizer;
+
+ void sslMode(SslMode sslMode) {
+ this.sslMode = sslMode;
}
- requireNonNull(tlsVersion, "tlsVersion must not be null");
+ void tlsVersions(String[] tlsVersions) {
+ int size = tlsVersions.length;
+
+ if (size > 0) {
+ String[] versions = new String[size];
+ System.arraycopy(tlsVersions, 0, versions, 0, size);
+ this.tlsVersions = versions;
+ } else {
+ this.tlsVersions = EMPTY_STRINGS;
+ }
+ }
+
+ void sslHostnameVerifier(HostnameVerifier sslHostnameVerifier) {
+ this.sslHostnameVerifier = sslHostnameVerifier;
+ }
- return new MySqlSslConfiguration(sslMode, tlsVersion, sslHostnameVerifier, sslCa, sslKey,
- sslKeyPassword, sslCert, sslContextBuilderCustomizer);
+ void sslCa(@Nullable String sslCa) {
+ this.sslCa = sslCa;
+ }
+
+ void sslCert(@Nullable String sslCert) {
+ this.sslCert = sslCert;
+ }
+
+ void sslKey(@Nullable String sslKey) {
+ this.sslKey = sslKey;
+ }
+
+ void sslKeyPassword(@Nullable CharSequence sslKeyPassword) {
+ this.sslKeyPassword = sslKeyPassword;
+ }
+
+ void sslContextBuilderCustomizer(Function customizer) {
+ this.sslContextBuilderCustomizer = customizer;
+ }
+
+ MySqlSslConfiguration build(boolean preferred) {
+ SslMode sslMode = this.sslMode;
+
+ if (sslMode == null) {
+ sslMode = preferred ? SslMode.PREFERRED : SslMode.DISABLED;
+ }
+
+ if (sslMode == SslMode.DISABLED) {
+ return DISABLED;
+ }
+
+ require((sslCert == null && sslKey == null) || (sslCert != null && sslKey != null),
+ "sslCert and sslKey must be both null or both non-null");
+
+ return new MySqlSslConfiguration(sslMode, tlsVersions, sslHostnameVerifier, sslCa, sslKey,
+ sslKeyPassword, sslCert, sslContextBuilderCustomizer);
+ }
}
}
diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlStatementSupport.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlStatementSupport.java
index 5b40500ee..7fd8cfbc1 100644
--- a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlStatementSupport.java
+++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlStatementSupport.java
@@ -46,12 +46,11 @@ abstract class MySqlStatementSupport implements MySqlStatement {
public final MySqlStatement returnGeneratedValues(String... columns) {
requireNonNull(columns, "columns must not be null");
- ConnectionContext context = client.getContext();
int len = columns.length;
if (len == 0) {
this.generatedColumns = InternalArrays.EMPTY_STRINGS;
- } else if (len == 1 || supportReturning(context)) {
+ } else if (len == 1 || supportReturning(client.getContext())) {
String[] result = new String[len];
for (int i = 0; i < len; ++i) {
@@ -61,7 +60,7 @@ public final MySqlStatement returnGeneratedValues(String... columns) {
this.generatedColumns = result;
} else {
- String db = context.isMariaDb() ? "MariaDB 10.5.0 or below" : "MySQL";
+ String db = client.getContext().isMariaDb() ? "MariaDB 10.5.0 or below" : "MySQL";
throw new IllegalArgumentException(db + " can have only one column");
}
diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/OptionMapper.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/OptionMapper.java
index f75a913f1..afc67a8bb 100644
--- a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/OptionMapper.java
+++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/OptionMapper.java
@@ -60,14 +60,13 @@ private Source(@Nullable T value) {
this.value = value;
}
- Otherwise to(Consumer super T> consumer) {
+ boolean to(Consumer super T> consumer) {
if (value == null) {
- return Otherwise.FALL;
+ return false;
}
consumer.accept(value);
-
- return Otherwise.NOOP;
+ return true;
}
Source as(Class type) {
@@ -268,27 +267,3 @@ private static O[] mapArray(String[] input, Function mapper, IntF
return output;
}
}
-
-enum Otherwise {
-
- NOOP {
- @Override
- void otherwise(Runnable runnable) {
- // Do nothing
- }
- },
-
- FALL {
- @Override
- void otherwise(Runnable runnable) {
- runnable.run();
- }
- };
-
- /**
- * Invoked if the previous {@link Source} outcome did not match.
- *
- * @param runnable the {@link Runnable} that should be invoked.
- */
- abstract void otherwise(Runnable runnable);
-}
diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/SingleHostConnectionStrategy.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/SingleHostConnectionStrategy.java
new file mode 100644
index 000000000..5790e21c4
--- /dev/null
+++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/SingleHostConnectionStrategy.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2024 asyncer.io projects
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.asyncer.r2dbc.mysql;
+
+import io.asyncer.r2dbc.mysql.client.Client;
+import io.asyncer.r2dbc.mysql.client.ReactorNettyClient;
+import io.asyncer.r2dbc.mysql.internal.NodeAddress;
+import reactor.core.publisher.Mono;
+
+import java.time.Duration;
+
+/**
+ * An implementation of {@link ConnectionStrategy} that connects to a single host. It can be wrapped to a
+ * FailoverClient.
+ */
+final class SingleHostConnectionStrategy implements ConnectionStrategy {
+
+ private final Mono client;
+
+ SingleHostConnectionStrategy(
+ MySqlConnectionConfiguration configuration,
+ NodeAddress address,
+ boolean tcpKeepAlive,
+ boolean tcpNoDelay
+ ) {
+ this.client = configuration.getCredential().flatMap(credential -> {
+ logger.debug("Connect to a single host: {}", address);
+
+ InetConnectFunction login = new InetConnectFunction(
+ true,
+ tcpKeepAlive,
+ tcpNoDelay,
+ credential,
+ configuration
+ );
+
+ return connectHost(login, address, 0, 3);
+ });
+ }
+
+ @Override
+ public Mono connect() {
+ return client;
+ }
+
+ private static Mono connectHost(
+ InetConnectFunction login,
+ NodeAddress address,
+ int attempts,
+ int maxAttempts
+ ) {
+ return login.apply(address::toUnresolved)
+ .onErrorResume(t -> resumeConnect(t, address, login, attempts, maxAttempts));
+ }
+
+ private static Mono resumeConnect(
+ Throwable t,
+ NodeAddress address,
+ InetConnectFunction login,
+ int attempts,
+ int maxAttempts
+ ) {
+ logger.warn("Fail to connect to {}", address, t);
+
+ if (attempts >= maxAttempts) {
+ return Mono.error(ConnectionStrategy.retryFail(
+ "Fail to establish connection, retried " + attempts + " times", t));
+ }
+
+ logger.warn("Failed to establish connection, auto-try again after 250ms.", t);
+
+ // Ignore waiting error, e.g. interrupted, scheduler rejected
+ return Mono.delay(Duration.ofMillis(250))
+ .onErrorComplete()
+ .then(Mono.defer(() -> connectHost(
+ login,
+ address,
+ attempts + 1,
+ maxAttempts
+ )));
+ }
+}
diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/SocketClientConfiguration.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/SocketClientConfiguration.java
new file mode 100644
index 000000000..3102e345b
--- /dev/null
+++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/SocketClientConfiguration.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2024 asyncer.io projects
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.asyncer.r2dbc.mysql;
+
+import org.jetbrains.annotations.Nullable;
+import reactor.netty.resources.LoopResources;
+
+import java.time.Duration;
+import java.util.Objects;
+
+/**
+ * A general-purpose configuration for a socket client. The client can be a TCP client or a Unix Domain Socket client.
+ */
+final class SocketClientConfiguration {
+
+ @Nullable
+ private final Duration connectTimeout;
+
+ @Nullable
+ private final LoopResources loopResources;
+
+ SocketClientConfiguration(@Nullable Duration connectTimeout, @Nullable LoopResources loopResources) {
+ this.connectTimeout = connectTimeout;
+ this.loopResources = loopResources;
+ }
+
+ @Nullable
+ Duration getConnectTimeout() {
+ return connectTimeout;
+ }
+
+ @Nullable
+ LoopResources getLoopResources() {
+ return loopResources;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof SocketClientConfiguration)) {
+ return false;
+ }
+
+ SocketClientConfiguration that = (SocketClientConfiguration) o;
+
+ return Objects.equals(connectTimeout, that.connectTimeout) && Objects.equals(loopResources, that.loopResources);
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * Objects.hashCode(connectTimeout) + Objects.hashCode(loopResources);
+ }
+
+ @Override
+ public String toString() {
+ return "Client{connectTimeout=" + connectTimeout + ", loopResources=" + loopResources + '}';
+ }
+
+ static final class Builder {
+
+ @Nullable
+ private Duration connectTimeout;
+
+ @Nullable
+ private LoopResources loopResources;
+
+ void connectTimeout(@Nullable Duration connectTimeout) {
+ this.connectTimeout = connectTimeout;
+ }
+
+ void loopResources(@Nullable LoopResources loopResources) {
+ this.loopResources = loopResources;
+ }
+
+ SocketClientConfiguration build() {
+ return new SocketClientConfiguration(connectTimeout, loopResources);
+ }
+ }
+}
diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/SocketConfiguration.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/SocketConfiguration.java
new file mode 100644
index 000000000..de317ddde
--- /dev/null
+++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/SocketConfiguration.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2024 asyncer.io projects
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.asyncer.r2dbc.mysql;
+
+/**
+ * A sealed interface for socket configuration, it is also a factory for creating {@link ConnectionStrategy}.
+ *
+ * @see TcpSocketConfiguration
+ * @see UnixDomainSocketConfiguration
+ */
+interface SocketConfiguration {
+
+ ConnectionStrategy strategy(MySqlConnectionConfiguration configuration);
+}
diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/TcpSocketConfiguration.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/TcpSocketConfiguration.java
new file mode 100644
index 000000000..e3ea6bb1f
--- /dev/null
+++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/TcpSocketConfiguration.java
@@ -0,0 +1,267 @@
+/*
+ * Copyright 2024 asyncer.io projects
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.asyncer.r2dbc.mysql;
+
+import io.asyncer.r2dbc.mysql.constant.HaProtocol;
+import io.asyncer.r2dbc.mysql.constant.ProtocolDriver;
+import io.asyncer.r2dbc.mysql.internal.NodeAddress;
+import io.asyncer.r2dbc.mysql.internal.util.InternalArrays;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.require;
+import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.requireNonEmpty;
+
+/**
+ * A configuration for a TCP/SSL socket.
+ */
+final class TcpSocketConfiguration implements SocketConfiguration {
+
+ private static final int DEFAULT_PORT = 3306;
+
+ private final ProtocolDriver driver;
+
+ private final HaProtocol protocol;
+
+ private final List addresses;
+
+ private final int retriesAllDown;
+
+ private final boolean tcpKeepAlive;
+
+ private final boolean tcpNoDelay;
+
+ TcpSocketConfiguration(
+ ProtocolDriver driver,
+ HaProtocol protocol,
+ List addresses,
+ int retriesAllDown,
+ boolean tcpKeepAlive,
+ boolean tcpNoDelay
+ ) {
+ this.driver = driver;
+ this.protocol = protocol;
+ this.addresses = addresses;
+ this.retriesAllDown = retriesAllDown;
+ this.tcpKeepAlive = tcpKeepAlive;
+ this.tcpNoDelay = tcpNoDelay;
+ }
+
+ ProtocolDriver getDriver() {
+ return driver;
+ }
+
+ HaProtocol getProtocol() {
+ return protocol;
+ }
+
+ NodeAddress getFirstAddress() {
+ if (addresses.isEmpty()) {
+ throw new IllegalStateException("No endpoints configured");
+ }
+ return addresses.get(0);
+ }
+
+ List getAddresses() {
+ return addresses;
+ }
+
+ int getRetriesAllDown() {
+ return retriesAllDown;
+ }
+
+ boolean isTcpKeepAlive() {
+ return tcpKeepAlive;
+ }
+
+ boolean isTcpNoDelay() {
+ return tcpNoDelay;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof TcpSocketConfiguration)) {
+ return false;
+ }
+
+ TcpSocketConfiguration that = (TcpSocketConfiguration) o;
+
+ return tcpKeepAlive == that.tcpKeepAlive &&
+ tcpNoDelay == that.tcpNoDelay &&
+ driver == that.driver &&
+ protocol == that.protocol &&
+ retriesAllDown == that.retriesAllDown &&
+ addresses.equals(that.addresses);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = driver.hashCode();
+
+ result = 31 * result + protocol.hashCode();
+ result = 31 * result + addresses.hashCode();
+ result = 31 * result + retriesAllDown;
+ result = 31 * result + (tcpKeepAlive ? 1 : 0);
+
+ return 31 * result + (tcpNoDelay ? 1 : 0);
+ }
+
+ @Override
+ public String toString() {
+ return "TCP{driver=" + driver +
+ ", protocol=" + protocol +
+ ", addresses=" + addresses +
+ ", retriesAllDown=" + retriesAllDown +
+ ", tcpKeepAlive=" + tcpKeepAlive +
+ ", tcpNoDelay=" + tcpNoDelay +
+ '}';
+ }
+
+ static final class Builder {
+
+ private ProtocolDriver driver = ProtocolDriver.MYSQL;
+
+ private HaProtocol protocol = HaProtocol.DEFAULT;
+
+ private final List addresses = new ArrayList<>();
+
+ private String host = "";
+
+ private int port = DEFAULT_PORT;
+
+ private boolean tcpKeepAlive = false;
+
+ private boolean tcpNoDelay = true;
+
+ private int retriesAllDown = 10;
+
+ void driver(ProtocolDriver driver) {
+ this.driver = driver;
+ }
+
+ void protocol(HaProtocol protocol) {
+ this.protocol = protocol;
+ }
+
+ void host(String host) {
+ this.host = host;
+ }
+
+ void port(int port) {
+ this.port = port;
+ }
+
+ void addHost(String host, int port) {
+ this.addresses.add(new NodeAddress(host, port));
+ }
+
+ void addHost(String host) {
+ this.addresses.add(new NodeAddress(host));
+ }
+
+ void retriesAllDown(int retriesAllDown) {
+ this.retriesAllDown = retriesAllDown;
+ }
+
+ void tcpKeepAlive(boolean tcpKeepAlive) {
+ this.tcpKeepAlive = tcpKeepAlive;
+ }
+
+ void tcpNoDelay(boolean tcpNoDelay) {
+ this.tcpNoDelay = tcpNoDelay;
+ }
+
+ TcpSocketConfiguration build() {
+ List addresses;
+
+ if (this.addresses.isEmpty()) {
+ requireNonEmpty(host, "Either single host or multiple hosts must be configured");
+
+ addresses = Collections.singletonList(new NodeAddress(host, port));
+ } else {
+ require(host.isEmpty(), "Either single host or multiple hosts must be configured");
+
+ addresses = InternalArrays.asImmutableList(this.addresses.toArray(new NodeAddress[0]));
+ }
+
+ return new TcpSocketConfiguration(
+ driver,
+ protocol,
+ addresses,
+ retriesAllDown,
+ tcpKeepAlive,
+ tcpNoDelay);
+ }
+ }
+
+ @Override
+ public ConnectionStrategy strategy(MySqlConnectionConfiguration configuration) {
+ switch (protocol) {
+ case REPLICATION:
+ ConnectionStrategy.logger.warn(
+ "R2DBC Connection cannot be set to read-only, replication protocol will use the first host");
+ return new MultiHostsConnectionStrategy(
+ configuration,
+ Collections.singletonList(getFirstAddress()),
+ driver,
+ retriesAllDown,
+ false,
+ tcpKeepAlive,
+ tcpNoDelay
+ );
+ case SEQUENTIAL:
+ return new MultiHostsConnectionStrategy(
+ configuration,
+ addresses,
+ driver,
+ retriesAllDown,
+ false,
+ tcpKeepAlive,
+ tcpNoDelay
+ );
+ case LOAD_BALANCE:
+ return new MultiHostsConnectionStrategy(
+ configuration,
+ addresses,
+ driver,
+ retriesAllDown,
+ true,
+ tcpKeepAlive,
+ tcpNoDelay
+ );
+ default:
+ if (ProtocolDriver.MYSQL == driver && addresses.size() == 1) {
+ return new SingleHostConnectionStrategy(configuration, getFirstAddress(), tcpKeepAlive, tcpNoDelay);
+ } else {
+ return new MultiHostsConnectionStrategy(
+ configuration,
+ addresses,
+ driver,
+ retriesAllDown,
+ false,
+ tcpKeepAlive,
+ tcpNoDelay
+ );
+ }
+ }
+ }
+}
diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/UnixDomainSocketConfiguration.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/UnixDomainSocketConfiguration.java
new file mode 100644
index 000000000..7d71f1d85
--- /dev/null
+++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/UnixDomainSocketConfiguration.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2024 asyncer.io projects
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.asyncer.r2dbc.mysql;
+
+/**
+ * A configuration for a Unix Domain Socket.
+ */
+final class UnixDomainSocketConfiguration implements SocketConfiguration {
+
+ private final String path;
+
+ UnixDomainSocketConfiguration(String path) {
+ this.path = path;
+ }
+
+ String getPath() {
+ return this.path;
+ }
+
+ @Override
+ public ConnectionStrategy strategy(MySqlConnectionConfiguration configuration) {
+ return new UnixDomainSocketConnectionStrategy(this, configuration);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof UnixDomainSocketConfiguration)) {
+ return false;
+ }
+
+ UnixDomainSocketConfiguration that = (UnixDomainSocketConfiguration) o;
+
+ return path.equals(that.path);
+ }
+
+ @Override
+ public int hashCode() {
+ return path.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "UnixDomainSocket{path='" + path + "'}";
+ }
+
+ static final class Builder {
+
+ private String path;
+
+ void path(String path) {
+ this.path = path;
+ }
+
+ UnixDomainSocketConfiguration build() {
+ return new UnixDomainSocketConfiguration(path);
+ }
+ }
+}
diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/UnixDomainSocketConnectionStrategy.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/UnixDomainSocketConnectionStrategy.java
new file mode 100644
index 000000000..f9ced48e3
--- /dev/null
+++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/UnixDomainSocketConnectionStrategy.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2024 asyncer.io projects
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.asyncer.r2dbc.mysql;
+
+import io.asyncer.r2dbc.mysql.client.Client;
+import io.netty.channel.unix.DomainSocketAddress;
+import reactor.core.publisher.Mono;
+import reactor.netty.tcp.TcpClient;
+
+/**
+ * An implementation of {@link ConnectionStrategy} that connects to a Unix Domain Socket.
+ */
+final class UnixDomainSocketConnectionStrategy implements ConnectionStrategy {
+
+ private final Mono client;
+
+ UnixDomainSocketConnectionStrategy(
+ UnixDomainSocketConfiguration socket,
+ MySqlConnectionConfiguration configuration
+ ) {
+ this.client = configuration.getCredential().flatMap(credential -> {
+ String path = socket.getPath();
+ TcpClient tcpClient = ConnectionStrategy.createTcpClient(configuration.getClient(), false)
+ .remoteAddress(() -> new DomainSocketAddress(path));
+
+ return ConnectionStrategy.connectWithInit(tcpClient, credential, configuration);
+ });
+ }
+
+ @Override
+ public Mono connect() {
+ return client;
+ }
+}
diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/client/Client.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/client/Client.java
index d7c3ac28a..1b570a933 100644
--- a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/client/Client.java
+++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/client/Client.java
@@ -17,27 +17,17 @@
package io.asyncer.r2dbc.mysql.client;
import io.asyncer.r2dbc.mysql.ConnectionContext;
-import io.asyncer.r2dbc.mysql.MySqlSslConfiguration;
import io.asyncer.r2dbc.mysql.message.client.ClientMessage;
import io.asyncer.r2dbc.mysql.message.server.ServerMessage;
import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.ChannelOption;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
-import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;
-import reactor.netty.resources.LoopResources;
-import reactor.netty.tcp.TcpClient;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.time.Duration;
import java.util.function.BiConsumer;
-import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.requireNonNull;
-
/**
* An abstraction that wraps the networking part of exchanging methods.
*/
@@ -105,52 +95,4 @@ public interface Client {
* @return if connection is valid
*/
boolean isConnected();
-
- /**
- * Sends a signal to the connection, which means server does not support SSL.
- */
- void sslUnsupported();
-
- /**
- * Sends a signal to {@link Client this}, which means login has succeeded.
- */
- void loginSuccess();
-
- /**
- * Connects to {@code address} with configurations. Normally, should log-in after connected.
- *
- * @param ssl the SSL configuration
- * @param address socket address, may be host address, or Unix Domain Socket address
- * @param tcpKeepAlive if enable the {@link ChannelOption#SO_KEEPALIVE}
- * @param tcpNoDelay if enable the {@link ChannelOption#TCP_NODELAY}
- * @param context the connection context
- * @param connectTimeout connect timeout, or {@code null} if it has no timeout
- * @param loopResources the loop resources to use
- * @return A {@link Mono} that will emit a connected {@link Client}.
- * @throws IllegalArgumentException if {@code ssl}, {@code address} or {@code context} is {@code null}.
- * @throws ArithmeticException if {@code connectTimeout} milliseconds overflow as an int
- */
- static Mono connect(MySqlSslConfiguration ssl, SocketAddress address, boolean tcpKeepAlive,
- boolean tcpNoDelay, ConnectionContext context, @Nullable Duration connectTimeout,
- LoopResources loopResources) {
- requireNonNull(ssl, "ssl must not be null");
- requireNonNull(address, "address must not be null");
- requireNonNull(context, "context must not be null");
-
- TcpClient tcpClient = TcpClient.newConnection()
- .runOn(loopResources);
-
- if (connectTimeout != null) {
- tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
- Math.toIntExact(connectTimeout.toMillis()));
- }
-
- if (address instanceof InetSocketAddress) {
- tcpClient = tcpClient.option(ChannelOption.SO_KEEPALIVE, tcpKeepAlive);
- tcpClient = tcpClient.option(ChannelOption.TCP_NODELAY, tcpNoDelay);
- }
-
- return tcpClient.remoteAddress(() -> address).connect()
- .map(conn -> new ReactorNettyClient(conn, ssl, context));
- }
}
diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/client/FailoverClient.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/client/FailoverClient.java
new file mode 100644
index 000000000..539fbd072
--- /dev/null
+++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/client/FailoverClient.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2024 asyncer.io projects
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.asyncer.r2dbc.mysql.client;
+
+import io.asyncer.r2dbc.mysql.ConnectionContext;
+import io.asyncer.r2dbc.mysql.message.client.ClientMessage;
+import io.asyncer.r2dbc.mysql.message.server.ServerMessage;
+import io.netty.buffer.ByteBufAllocator;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.SynchronousSink;
+
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+
+/**
+ * An implementation of {@link Client} that supports failover.
+ */
+public final class FailoverClient implements Client {
+
+ private final Mono failover;
+
+ private final AtomicReference client;
+
+ public FailoverClient(ReactorNettyClient client, Mono failover) {
+ this.client = new AtomicReference<>(client);
+ this.failover = failover;
+ }
+
+ private Mono reconnectIfNecessary() {
+ return Mono.defer(() -> {
+ ReactorNettyClient client = this.client.get();
+
+ if (client.isChannelOpen() || client.isClosingOrClosed()) {
+ // Open, or closed by user
+ return Mono.just(client);
+ }
+
+ return this.failover.flatMap(c -> {
+ if (this.client.compareAndSet(client, c)) {
+ // TODO: re-init session variables, transaction state, clear prepared cache, etc.
+ return Mono.just(c);
+ }
+
+ // Reconnected by other thread, close this one and retry
+ return c.forceClose().then(reconnectIfNecessary());
+ });
+ });
+ }
+
+ @Override
+ public Flux exchange(ClientMessage request, BiConsumer> handler) {
+ return reconnectIfNecessary().flatMapMany(c -> c.exchange(request, handler));
+ }
+
+ @Override
+ public Flux exchange(FluxExchangeable exchangeable) {
+ return reconnectIfNecessary().flatMapMany(c -> c.exchange(exchangeable));
+ }
+
+ @Override
+ public Mono close() {
+ return Mono.fromSupplier(this.client::get).flatMap(ReactorNettyClient::close);
+ }
+
+ @Override
+ public Mono forceClose() {
+ return Mono.fromSupplier(this.client::get).flatMap(ReactorNettyClient::forceClose);
+ }
+
+ @Override
+ public ByteBufAllocator getByteBufAllocator() {
+ return this.client.get().getByteBufAllocator();
+ }
+
+ @Override
+ public ConnectionContext getContext() {
+ return this.client.get().getContext();
+ }
+
+ @Override
+ public boolean isConnected() {
+ return this.client.get().isConnected();
+ }
+}
diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/client/ReactorNettyClient.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/client/ReactorNettyClient.java
index 81cb5f21e..69bef4e93 100644
--- a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/client/ReactorNettyClient.java
+++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/client/ReactorNettyClient.java
@@ -41,6 +41,7 @@
import reactor.core.publisher.SynchronousSink;
import reactor.netty.Connection;
import reactor.netty.FutureMono;
+import reactor.netty.tcp.TcpClient;
import reactor.util.context.Context;
import reactor.util.context.ContextView;
@@ -54,7 +55,7 @@
/**
* An implementation of client based on the Reactor Netty project.
*/
-final class ReactorNettyClient implements Client {
+public final class ReactorNettyClient implements Client {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ReactorNettyClient.class);
@@ -250,12 +251,10 @@ public boolean isConnected() {
return state < ST_CLOSED && connection.channel().isOpen();
}
- @Override
public void sslUnsupported() {
connection.channel().pipeline().fireUserEventTriggered(SslState.UNSUPPORTED);
}
- @Override
public void loginSuccess() {
if (context.getCapability().isCompression()) {
connection.channel().pipeline().fireUserEventTriggered(PacketEvent.USE_COMPRESSION);
@@ -264,6 +263,14 @@ public void loginSuccess() {
}
}
+ boolean isClosingOrClosed() {
+ return state >= ST_CLOSING;
+ }
+
+ boolean isChannelOpen() {
+ return connection.channel().isOpen();
+ }
+
private static void resetSequence(Connection connection) {
connection.channel().pipeline().fireUserEventTriggered(PacketEvent.RESET_SEQUENCE);
}
@@ -324,6 +331,27 @@ private void handleClose() {
}
}
+ /**
+ * Connects to a MySQL server using the provided {@link TcpClient} and {@link MySqlSslConfiguration}.
+ *
+ * @param tcpClient the configured TCP client
+ * @param ssl the SSL configuration
+ * @param context the connection context
+ * @return A {@link Mono} that will emit a connected {@link Client}.
+ * @throws IllegalArgumentException if {@code tcpClient}, {@code ssl} or {@code context} is {@code null}.
+ */
+ public static Mono connect(
+ TcpClient tcpClient,
+ MySqlSslConfiguration ssl,
+ ConnectionContext context
+ ) {
+ requireNonNull(tcpClient, "tcpClient must not be null");
+ requireNonNull(ssl, "ssl must not be null");
+ requireNonNull(context, "context must not be null");
+
+ return tcpClient.connect().map(conn -> new ReactorNettyClient(conn, ssl, context));
+ }
+
private final class ResponseSubscriber implements CoreSubscriber