Skip to content

Commit

Permalink
Support direct shard database operation routing in Spring JDBC (#31506)
Browse files Browse the repository at this point in the history
Introduce ShardingKeyDataSourceAdapter to get shard connections.

This commit introduces a DataSource proxy, that changes the behavior of the getConnection method to use the `createConnectionBuilder()` api to acquire direct shard connections. The shard connection is acquired by specifying a `ShardingKey` that is correspondent to the wanted shard.
  • Loading branch information
meedbek authored Dec 8, 2023
1 parent d919930 commit e4e2224
Show file tree
Hide file tree
Showing 3 changed files with 265 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.springframework.jdbc.core;

import java.sql.SQLException;
import java.sql.ShardingKey;

import org.springframework.lang.Nullable;

/**
* Interface defines methods for retrieving sharding keys, which are used to establish
* direct shard connections (in the context of sharded databases). This is used as a
* way of providing the sharding key in
* {@link org.springframework.jdbc.datasource.ShardingKeyDataSourceAdapter}.
*
* @author Mohamed Lahyane (Anir)
*/


public interface ShardingKeyProvider {
/**
* Retrieves the sharding key. This method returns the sharding key relevant to the current context,
* which will be used to obtain a direct shard connection.
*
* @return The sharding key, or null if it is not available or cannot be determined.
* @throws SQLException If an error occurs while obtaining the sharding key.
*/
@Nullable
ShardingKey getShardingKey() throws SQLException;

/**
* Retrieves the super sharding key. This method returns the super sharding key relevant to the
* current context, which will be used to obtain a direct shard connection.
*
* @return The super sharding key, or null if it is not available or cannot be determined.
* @throws SQLException If an error occurs while obtaining the super sharding key.
*/
@Nullable
ShardingKey getSuperShardingKey() throws SQLException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package org.springframework.jdbc.datasource;

import java.sql.Connection;
import java.sql.ConnectionBuilder;
import java.sql.SQLException;
import java.sql.ShardingKey;
import java.sql.ShardingKeyBuilder;

import javax.sql.DataSource;

import org.springframework.core.NamedThreadLocal;
import org.springframework.jdbc.core.ShardingKeyProvider;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

/**
* An adapter for a target {@link DataSource}, designed to apply sharding keys, if specified,
* to every standard {@code getConnection()} call, returning a direct connection to the shard
* corresponding to the specified sharding key value. All other methods are simply delegated
* to the corresponding methods of the target DataSource.
*
* <p>The target {@link DataSource} must implement the {@code createConnectionBuilder()} method;
* otherwise, a {@link java.sql.SQLFeatureNotSupportedException} will be thrown when attempting
* to acquire shard connections.</p>
*
* <p>This proxy datasource takes a {@link ShardingKeyProvider} object as an attribute,
* which is used to get the sharding keys.</p>
*
* @author Mohamed Lahyane (Anir)
* @see #getConnection
* @see #createConnectionBuilder()
* @see UserCredentialsDataSourceAdapter
*/
public class ShardingKeyDataSourceAdapter extends DelegatingDataSource {
@Nullable
private ShardingKeyProvider shardingkeyProvider;

/**
* Creates a new instance of ShardingKeyDataSourceAdapter, wrapping the given {@link DataSource}.
*
* @param dataSource the target DataSource to be wrapped.
*/
public ShardingKeyDataSourceAdapter(DataSource dataSource) {
super(dataSource);
}

/**
* Creates a new instance of ShardingKeyDataSourceAdapter, wrapping the given {@link DataSource}.
*
* @param dataSource the target DataSource to be wrapped.
* @param shardingKeyProvider the ShardingKeyProvider used to get the shardingKeys.
*/
public ShardingKeyDataSourceAdapter(DataSource dataSource, ShardingKeyProvider shardingKeyProvider) {
super(dataSource);
this.shardingkeyProvider = shardingKeyProvider;
}

/**
* Sets the {@link ShardingKeyProvider} for this adapter.
*
* @param shardingKeyProvider the ShardingKeyProvider to set.
*/
public void setShardingKeyProvider(ShardingKeyProvider shardingKeyProvider) {
this.shardingkeyProvider = shardingKeyProvider;
}

/**
* Obtains a connection to the database shard using the provided sharding key
* and super sharding key (if available).
* <p>the sharding key is obtained from the thread local storage, if is {@code null},
* it is obtained from the {@link ShardingKeyProvider}.</p>
*
* @return a Connection object representing a direct shard connection.
* @throws SQLException if an error occurs while creating the connection.
* @see #createConnectionBuilder()
*/
@Override
public Connection getConnection() throws SQLException {
return createConnectionBuilder().build();
}

/**
* Obtains a connection to the database shard using the provided username and password,
* considering the sharding keys (if available) and the given credentials.
*
* @param username the database user on whose behalf the connection is being made.
* @param password the user's password.
* @return a Connection object representing a direct shard connection.
* @throws SQLException if an error occurs while creating the connection.
*/
@Override
public Connection getConnection(String username, String password) throws SQLException {
return createConnectionBuilder().user(username).password(password).build();
}

/**
* Creates a new instance of {@link ConnectionBuilder} using the target DataSource's
* {@code createConnectionBuilder()} method, and sets the appropriate sharding keys
* from the thread-local storage or the {@link ShardingKeyProvider}.
*
* @return a ConnectionBuilder object representing a builder for direct shard connections.
* @throws SQLException if an error occurs while creating the ConnectionBuilder.
*/
@Override
public ConnectionBuilder createConnectionBuilder() throws SQLException {
ConnectionBuilder connectionBuilder = obtainTargetDataSource().createConnectionBuilder();

ShardingKey shardingKey = null;
ShardingKey superShardingKey = null;

if (shardingkeyProvider != null) {
shardingKey = shardingkeyProvider.getShardingKey();
superShardingKey = shardingkeyProvider.getSuperShardingKey();
}

return connectionBuilder.shardingKey(shardingKey).superShardingKey(superShardingKey);
}

/**
* Creates a new instance of {@link ShardingKeyBuilder} using the target DataSource's
* {@code createShardingKeyBuilder()} method.
*
* @return a ShardingKeyBuilder object representing a builder for sharding keys.
* @throws SQLException if an error occurs while creating the ShardingKeyBuilder.
*/
@Override
public ShardingKeyBuilder createShardingKeyBuilder() throws SQLException {
return obtainTargetDataSource().createShardingKeyBuilder();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package org.springframework.jdbc.datasource;

import java.sql.Connection;
import java.sql.ConnectionBuilder;
import java.sql.SQLException;
import java.sql.ShardingKey;

import javax.sql.DataSource;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.springframework.jdbc.core.ShardingKeyProvider;

import static org.assertj.core.api.Assertions.*;
import static org.mockito.BDDMockito.*;

public class ShardingKeyDataSourceAdapterTests {
private final Connection connection = mock();
private final Connection shardConnection = mock();
private final DataSource dataSource = mock();
private final ConnectionBuilder connectionBuilder = mock(ConnectionBuilder.class, RETURNS_DEEP_STUBS);
private final ConnectionBuilder shardConnectionBuilder = mock(ConnectionBuilder.class, RETURNS_DEEP_STUBS);
private final ShardingKey shardingKey = mock();
private final ShardingKey superShardingKey = mock();
private final ShardingKeyProvider shardingKeyProvider = new ShardingKeyProvider() {
@Override
public ShardingKey getShardingKey() throws SQLException {
return shardingKey;
}

@Override
public ShardingKey getSuperShardingKey() throws SQLException {
return superShardingKey;
}
};

@BeforeEach
public void setUp() throws SQLException {
given(dataSource.createConnectionBuilder()).willReturn(connectionBuilder);
when(connectionBuilder.shardingKey(null).superShardingKey(null)).thenReturn(connectionBuilder);
when(connectionBuilder.shardingKey(shardingKey).superShardingKey(superShardingKey))
.thenReturn(shardConnectionBuilder);
}

@Test
public void testGetConnectionNoKeyProvider() throws SQLException {
ShardingKeyDataSourceAdapter dataSourceAdapter = new ShardingKeyDataSourceAdapter(dataSource);

when(connectionBuilder.build()).thenReturn(connection);

assertThat(dataSourceAdapter.getConnection()).isEqualTo(connection);
}

@Test
public void testGetConnectionWithKeyProvider() throws SQLException {

ShardingKeyDataSourceAdapter dataSourceAdapter = new ShardingKeyDataSourceAdapter(
dataSource,
shardingKeyProvider);

when(shardConnectionBuilder.build()).thenReturn(shardConnection);

assertThat(dataSourceAdapter.getConnection()).isEqualTo(shardConnection);
}

@Test
public void testGetConnectionWithCredentialsNoKeyProvider() throws SQLException {
ShardingKeyDataSourceAdapter dataSourceAdapter = new ShardingKeyDataSourceAdapter(dataSource);

String username = "Anir";
String password = "spring";

Connection userConnection = mock();

when(connectionBuilder.user(username).password(password).build()).thenReturn(userConnection);

assertThat(dataSourceAdapter.getConnection(username, password)).isEqualTo(userConnection);
}

@Test
public void testGetConnectionWithCredentialsAndKeyProvider() throws SQLException {
ShardingKeyDataSourceAdapter dataSourceAdapter = new ShardingKeyDataSourceAdapter(
dataSource,
shardingKeyProvider);

String username = "mbekraou";
String password = "jdbc";

Connection userWithKeyProviderConnection = mock();

when(shardConnectionBuilder.user(username).password(password).build())
.thenReturn(userWithKeyProviderConnection);

assertThat(dataSourceAdapter.getConnection(username, password)).isEqualTo(userWithKeyProviderConnection);
}
}

0 comments on commit e4e2224

Please sign in to comment.