From e4e222444946770bda19fe3964a4aadd7e06fd2b Mon Sep 17 00:00:00 2001 From: Mohammed Bekraoui Date: Fri, 8 Dec 2023 23:09:39 +0100 Subject: [PATCH] Support direct shard database operation routing in Spring JDBC (#31506) Introduce ShardingKeyDataSourceAdapter to get shard connections. This commit introduces a DataSource proxy, that changes the behavior of the getConnection method to use the `createConnectionBuilder()` api to acquire direct shard connections. The shard connection is acquired by specifying a `ShardingKey` that is correspondent to the wanted shard. --- .../jdbc/core/ShardingKeyProvider.java | 38 +++++ .../ShardingKeyDataSourceAdapter.java | 130 ++++++++++++++++++ .../ShardingKeyDataSourceAdapterTests.java | 97 +++++++++++++ 3 files changed, 265 insertions(+) create mode 100644 spring-jdbc/src/main/java/org/springframework/jdbc/core/ShardingKeyProvider.java create mode 100644 spring-jdbc/src/main/java/org/springframework/jdbc/datasource/ShardingKeyDataSourceAdapter.java create mode 100644 spring-jdbc/src/test/java/org/springframework/jdbc/datasource/ShardingKeyDataSourceAdapterTests.java diff --git a/spring-jdbc/src/main/java/org/springframework/jdbc/core/ShardingKeyProvider.java b/spring-jdbc/src/main/java/org/springframework/jdbc/core/ShardingKeyProvider.java new file mode 100644 index 000000000000..46725968a04b --- /dev/null +++ b/spring-jdbc/src/main/java/org/springframework/jdbc/core/ShardingKeyProvider.java @@ -0,0 +1,38 @@ +package org.springframework.jdbc.core; + +import java.sql.SQLException; +import java.sql.ShardingKey; + +import org.springframework.lang.Nullable; + +/** + * Interface defines methods for retrieving sharding keys, which are used to establish + * direct shard connections (in the context of sharded databases). This is used as a + * way of providing the sharding key in + * {@link org.springframework.jdbc.datasource.ShardingKeyDataSourceAdapter}. + * + * @author Mohamed Lahyane (Anir) + */ + + +public interface ShardingKeyProvider { + /** + * Retrieves the sharding key. This method returns the sharding key relevant to the current context, + * which will be used to obtain a direct shard connection. + * + * @return The sharding key, or null if it is not available or cannot be determined. + * @throws SQLException If an error occurs while obtaining the sharding key. + */ + @Nullable + ShardingKey getShardingKey() throws SQLException; + + /** + * Retrieves the super sharding key. This method returns the super sharding key relevant to the + * current context, which will be used to obtain a direct shard connection. + * + * @return The super sharding key, or null if it is not available or cannot be determined. + * @throws SQLException If an error occurs while obtaining the super sharding key. + */ + @Nullable + ShardingKey getSuperShardingKey() throws SQLException; +} diff --git a/spring-jdbc/src/main/java/org/springframework/jdbc/datasource/ShardingKeyDataSourceAdapter.java b/spring-jdbc/src/main/java/org/springframework/jdbc/datasource/ShardingKeyDataSourceAdapter.java new file mode 100644 index 000000000000..0b8f1aa189ca --- /dev/null +++ b/spring-jdbc/src/main/java/org/springframework/jdbc/datasource/ShardingKeyDataSourceAdapter.java @@ -0,0 +1,130 @@ +package org.springframework.jdbc.datasource; + +import java.sql.Connection; +import java.sql.ConnectionBuilder; +import java.sql.SQLException; +import java.sql.ShardingKey; +import java.sql.ShardingKeyBuilder; + +import javax.sql.DataSource; + +import org.springframework.core.NamedThreadLocal; +import org.springframework.jdbc.core.ShardingKeyProvider; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; + +/** + * An adapter for a target {@link DataSource}, designed to apply sharding keys, if specified, + * to every standard {@code getConnection()} call, returning a direct connection to the shard + * corresponding to the specified sharding key value. All other methods are simply delegated + * to the corresponding methods of the target DataSource. + * + *

The target {@link DataSource} must implement the {@code createConnectionBuilder()} method; + * otherwise, a {@link java.sql.SQLFeatureNotSupportedException} will be thrown when attempting + * to acquire shard connections.

+ * + *

This proxy datasource takes a {@link ShardingKeyProvider} object as an attribute, + * which is used to get the sharding keys.

+ * + * @author Mohamed Lahyane (Anir) + * @see #getConnection + * @see #createConnectionBuilder() + * @see UserCredentialsDataSourceAdapter + */ +public class ShardingKeyDataSourceAdapter extends DelegatingDataSource { + @Nullable + private ShardingKeyProvider shardingkeyProvider; + + /** + * Creates a new instance of ShardingKeyDataSourceAdapter, wrapping the given {@link DataSource}. + * + * @param dataSource the target DataSource to be wrapped. + */ + public ShardingKeyDataSourceAdapter(DataSource dataSource) { + super(dataSource); + } + + /** + * Creates a new instance of ShardingKeyDataSourceAdapter, wrapping the given {@link DataSource}. + * + * @param dataSource the target DataSource to be wrapped. + * @param shardingKeyProvider the ShardingKeyProvider used to get the shardingKeys. + */ + public ShardingKeyDataSourceAdapter(DataSource dataSource, ShardingKeyProvider shardingKeyProvider) { + super(dataSource); + this.shardingkeyProvider = shardingKeyProvider; + } + + /** + * Sets the {@link ShardingKeyProvider} for this adapter. + * + * @param shardingKeyProvider the ShardingKeyProvider to set. + */ + public void setShardingKeyProvider(ShardingKeyProvider shardingKeyProvider) { + this.shardingkeyProvider = shardingKeyProvider; + } + + /** + * Obtains a connection to the database shard using the provided sharding key + * and super sharding key (if available). + *

the sharding key is obtained from the thread local storage, if is {@code null}, + * it is obtained from the {@link ShardingKeyProvider}.

+ * + * @return a Connection object representing a direct shard connection. + * @throws SQLException if an error occurs while creating the connection. + * @see #createConnectionBuilder() + */ + @Override + public Connection getConnection() throws SQLException { + return createConnectionBuilder().build(); + } + + /** + * Obtains a connection to the database shard using the provided username and password, + * considering the sharding keys (if available) and the given credentials. + * + * @param username the database user on whose behalf the connection is being made. + * @param password the user's password. + * @return a Connection object representing a direct shard connection. + * @throws SQLException if an error occurs while creating the connection. + */ + @Override + public Connection getConnection(String username, String password) throws SQLException { + return createConnectionBuilder().user(username).password(password).build(); + } + + /** + * Creates a new instance of {@link ConnectionBuilder} using the target DataSource's + * {@code createConnectionBuilder()} method, and sets the appropriate sharding keys + * from the thread-local storage or the {@link ShardingKeyProvider}. + * + * @return a ConnectionBuilder object representing a builder for direct shard connections. + * @throws SQLException if an error occurs while creating the ConnectionBuilder. + */ + @Override + public ConnectionBuilder createConnectionBuilder() throws SQLException { + ConnectionBuilder connectionBuilder = obtainTargetDataSource().createConnectionBuilder(); + + ShardingKey shardingKey = null; + ShardingKey superShardingKey = null; + + if (shardingkeyProvider != null) { + shardingKey = shardingkeyProvider.getShardingKey(); + superShardingKey = shardingkeyProvider.getSuperShardingKey(); + } + + return connectionBuilder.shardingKey(shardingKey).superShardingKey(superShardingKey); + } + + /** + * Creates a new instance of {@link ShardingKeyBuilder} using the target DataSource's + * {@code createShardingKeyBuilder()} method. + * + * @return a ShardingKeyBuilder object representing a builder for sharding keys. + * @throws SQLException if an error occurs while creating the ShardingKeyBuilder. + */ + @Override + public ShardingKeyBuilder createShardingKeyBuilder() throws SQLException { + return obtainTargetDataSource().createShardingKeyBuilder(); + } +} diff --git a/spring-jdbc/src/test/java/org/springframework/jdbc/datasource/ShardingKeyDataSourceAdapterTests.java b/spring-jdbc/src/test/java/org/springframework/jdbc/datasource/ShardingKeyDataSourceAdapterTests.java new file mode 100644 index 000000000000..f12471ca30b0 --- /dev/null +++ b/spring-jdbc/src/test/java/org/springframework/jdbc/datasource/ShardingKeyDataSourceAdapterTests.java @@ -0,0 +1,97 @@ +package org.springframework.jdbc.datasource; + +import java.sql.Connection; +import java.sql.ConnectionBuilder; +import java.sql.SQLException; +import java.sql.ShardingKey; + +import javax.sql.DataSource; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.springframework.jdbc.core.ShardingKeyProvider; + +import static org.assertj.core.api.Assertions.*; +import static org.mockito.BDDMockito.*; + +public class ShardingKeyDataSourceAdapterTests { + private final Connection connection = mock(); + private final Connection shardConnection = mock(); + private final DataSource dataSource = mock(); + private final ConnectionBuilder connectionBuilder = mock(ConnectionBuilder.class, RETURNS_DEEP_STUBS); + private final ConnectionBuilder shardConnectionBuilder = mock(ConnectionBuilder.class, RETURNS_DEEP_STUBS); + private final ShardingKey shardingKey = mock(); + private final ShardingKey superShardingKey = mock(); + private final ShardingKeyProvider shardingKeyProvider = new ShardingKeyProvider() { + @Override + public ShardingKey getShardingKey() throws SQLException { + return shardingKey; + } + + @Override + public ShardingKey getSuperShardingKey() throws SQLException { + return superShardingKey; + } + }; + + @BeforeEach + public void setUp() throws SQLException { + given(dataSource.createConnectionBuilder()).willReturn(connectionBuilder); + when(connectionBuilder.shardingKey(null).superShardingKey(null)).thenReturn(connectionBuilder); + when(connectionBuilder.shardingKey(shardingKey).superShardingKey(superShardingKey)) + .thenReturn(shardConnectionBuilder); + } + + @Test + public void testGetConnectionNoKeyProvider() throws SQLException { + ShardingKeyDataSourceAdapter dataSourceAdapter = new ShardingKeyDataSourceAdapter(dataSource); + + when(connectionBuilder.build()).thenReturn(connection); + + assertThat(dataSourceAdapter.getConnection()).isEqualTo(connection); + } + + @Test + public void testGetConnectionWithKeyProvider() throws SQLException { + + ShardingKeyDataSourceAdapter dataSourceAdapter = new ShardingKeyDataSourceAdapter( + dataSource, + shardingKeyProvider); + + when(shardConnectionBuilder.build()).thenReturn(shardConnection); + + assertThat(dataSourceAdapter.getConnection()).isEqualTo(shardConnection); + } + + @Test + public void testGetConnectionWithCredentialsNoKeyProvider() throws SQLException { + ShardingKeyDataSourceAdapter dataSourceAdapter = new ShardingKeyDataSourceAdapter(dataSource); + + String username = "Anir"; + String password = "spring"; + + Connection userConnection = mock(); + + when(connectionBuilder.user(username).password(password).build()).thenReturn(userConnection); + + assertThat(dataSourceAdapter.getConnection(username, password)).isEqualTo(userConnection); + } + + @Test + public void testGetConnectionWithCredentialsAndKeyProvider() throws SQLException { + ShardingKeyDataSourceAdapter dataSourceAdapter = new ShardingKeyDataSourceAdapter( + dataSource, + shardingKeyProvider); + + String username = "mbekraou"; + String password = "jdbc"; + + Connection userWithKeyProviderConnection = mock(); + + when(shardConnectionBuilder.user(username).password(password).build()) + .thenReturn(userWithKeyProviderConnection); + + assertThat(dataSourceAdapter.getConnection(username, password)).isEqualTo(userWithKeyProviderConnection); + } +} \ No newline at end of file