-
Notifications
You must be signed in to change notification settings - Fork 23
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add support for multiple hosts configuration
- Allow to use Mono for user and password - Add multiple hosts connection strategy - Add HA protocol support for multiple hosts - Allow to use DNS SRV records for HA protocol
- Loading branch information
1 parent
dcbd67a
commit cfef3f1
Showing
47 changed files
with
2,559 additions
and
864 deletions.
There are no files selected for viewing
142 changes: 142 additions & 0 deletions
142
r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/ConnectionStrategy.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,142 @@ | ||
/* | ||
* Copyright 2024 asyncer.io projects | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package io.asyncer.r2dbc.mysql; | ||
|
||
import io.asyncer.r2dbc.mysql.client.Client; | ||
import io.asyncer.r2dbc.mysql.constant.CompressionAlgorithm; | ||
import io.asyncer.r2dbc.mysql.constant.SslMode; | ||
import io.netty.channel.ChannelOption; | ||
import io.netty.resolver.AddressResolver; | ||
import io.netty.resolver.AddressResolverGroup; | ||
import io.netty.resolver.DefaultNameResolver; | ||
import io.netty.resolver.RoundRobinInetAddressResolver; | ||
import io.netty.util.concurrent.EventExecutor; | ||
import io.netty.util.internal.logging.InternalLogger; | ||
import io.netty.util.internal.logging.InternalLoggerFactory; | ||
import reactor.core.publisher.Mono; | ||
import reactor.netty.resources.LoopResources; | ||
import reactor.netty.tcp.TcpClient; | ||
|
||
import java.net.InetSocketAddress; | ||
import java.time.Duration; | ||
import java.util.Set; | ||
|
||
/** | ||
* An interface of a connection strategy that considers how to obtain a MySQL {@link Client} object. | ||
* | ||
* @since 1.2.0 | ||
*/ | ||
@FunctionalInterface | ||
interface ConnectionStrategy { | ||
|
||
InternalLogger logger = InternalLoggerFactory.getInstance(ConnectionStrategy.class); | ||
|
||
/** | ||
* Establish a connection to a target server that is determined by this connection strategy. | ||
* | ||
* @return a logged-in {@link Client} object. | ||
*/ | ||
Mono<Client> connect(); | ||
|
||
/** | ||
* Creates a general-purpose {@link TcpClient} with the given {@link SocketClientConfiguration}. | ||
* <p> | ||
* Note: Unix Domain Socket also uses this method to create a general-purpose {@link TcpClient client}. | ||
* | ||
* @param configuration socket client configuration. | ||
* @return a general-purpose {@link TcpClient client}. | ||
*/ | ||
static TcpClient createTcpClient(SocketClientConfiguration configuration, boolean balancedDns) { | ||
LoopResources loopResources = configuration.getLoopResources(); | ||
Duration connectTimeout = configuration.getConnectTimeout(); | ||
TcpClient client = TcpClient.newConnection(); | ||
|
||
if (loopResources != null) { | ||
client = client.runOn(loopResources); | ||
} | ||
|
||
if (connectTimeout != null) { | ||
client = client.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(connectTimeout.toMillis())); | ||
} | ||
|
||
if (balancedDns) { | ||
client = client.resolver(BalancedResolverGroup.INSTANCE); | ||
} | ||
|
||
return client; | ||
} | ||
|
||
/** | ||
* Logins to a MySQL server with the given {@link TcpClient}, {@link Credential} and configurations. | ||
* | ||
* @param tcpClient a TCP client to connect to a MySQL server. | ||
* @param credential user and password to log in to a MySQL server. | ||
* @param configuration a configuration that affects login behavior. | ||
* @return a logged-in {@link Client} object. | ||
*/ | ||
static Mono<Client> login( | ||
TcpClient tcpClient, | ||
Credential credential, | ||
MySqlConnectionConfiguration configuration | ||
) { | ||
MySqlSslConfiguration ssl = configuration.getSsl(); | ||
SslMode sslMode = ssl.getSslMode(); | ||
boolean createDbIfNotExist = configuration.isCreateDatabaseIfNotExist(); | ||
String database = configuration.getDatabase(); | ||
String loginDb = createDbIfNotExist ? "" : database; | ||
Set<CompressionAlgorithm> compressionAlgorithms = configuration.getCompressionAlgorithms(); | ||
int zstdLevel = configuration.getZstdCompressionLevel(); | ||
ConnectionContext context = new ConnectionContext( | ||
configuration.getZeroDateOption(), | ||
configuration.getLoadLocalInfilePath(), | ||
configuration.getLocalInfileBufferSize(), | ||
configuration.isPreserveInstants(), | ||
configuration.retrieveConnectionZoneId() | ||
); | ||
|
||
return Client.connect(tcpClient, ssl, context).flatMap(client -> | ||
QueryFlow.login(client, sslMode, loginDb, credential, compressionAlgorithms, zstdLevel, context)); | ||
} | ||
} | ||
|
||
/** | ||
* Resolves the {@link InetSocketAddress} to IP address, randomly select one if it resolves to multiple IP addresses. | ||
* <p> | ||
* Note: DNS resolution should have no relation to the connection strategy of HA protocol. | ||
* | ||
* @since 1.2.0 | ||
*/ | ||
final class BalancedResolverGroup extends AddressResolverGroup<InetSocketAddress> { | ||
|
||
BalancedResolverGroup() { | ||
} | ||
|
||
public static final BalancedResolverGroup INSTANCE; | ||
|
||
static { | ||
INSTANCE = new BalancedResolverGroup(); | ||
Runtime.getRuntime().addShutdownHook(new Thread( | ||
INSTANCE::close, | ||
"R2DBC-MySQL-BalancedResolverGroup-ShutdownHook" | ||
)); | ||
} | ||
|
||
@Override | ||
protected AddressResolver<InetSocketAddress> newResolver(EventExecutor executor) { | ||
return new RoundRobinInetAddressResolver(executor, new DefaultNameResolver(executor)).asAddressResolver(); | ||
} | ||
} |
70 changes: 70 additions & 0 deletions
70
r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/Credential.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
/* | ||
* Copyright 2024 asyncer.io projects | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package io.asyncer.r2dbc.mysql; | ||
|
||
import org.jetbrains.annotations.Nullable; | ||
|
||
import java.util.Objects; | ||
|
||
/** | ||
* A value object representing a user with an optional password. | ||
*/ | ||
final class Credential { | ||
|
||
private final String user; | ||
|
||
@Nullable | ||
private final CharSequence password; | ||
|
||
Credential(String user, @Nullable CharSequence password) { | ||
this.user = user; | ||
this.password = password; | ||
} | ||
|
||
String getUser() { | ||
return user; | ||
} | ||
|
||
@Nullable | ||
CharSequence getPassword() { | ||
return password; | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (this == o) { | ||
return true; | ||
} | ||
if (!(o instanceof Credential)) { | ||
return false; | ||
} | ||
|
||
Credential that = (Credential) o; | ||
|
||
return user.equals(that.user) && Objects.equals(password, that.password); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return 31 * user.hashCode() + Objects.hashCode(password); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "Credential{user=" + user + ", password=REDACTED}"; | ||
} | ||
} |
Oops, something went wrong.