diff --git a/driver-core/pom.xml b/driver-core/pom.xml index f7d727012dd..03b427bd8e6 100644 --- a/driver-core/pom.xml +++ b/driver-core/pom.xml @@ -195,6 +195,13 @@ 1.78.1 + + + org.burningwave + tools + test + + diff --git a/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java b/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java index cb2f424df72..7f3c3bde636 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java +++ b/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java @@ -34,6 +34,7 @@ import com.datastax.driver.core.utils.MoreFutures; import com.datastax.driver.core.utils.MoreObjects; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterators; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; @@ -160,6 +161,15 @@ protected Connection tryReconnect() throws ConnectionException { if (isShutdown) throw new ConnectionException(null, "Control connection was shut down"); try { + if (cluster + .configuration + .getQueryOptions() + .shouldAddOriginalContactsToReconnectionPlan()) { + List initialContacts = cluster.metadata.getContactPoints(); + Collections.shuffle(initialContacts); + return reconnectInternal( + Iterators.concat(queryPlan(), initialContacts.iterator()), false); + } return reconnectInternal(queryPlan(), false); } catch (NoHostAvailableException e) { throw new ConnectionException(null, e.getMessage()); diff --git a/driver-core/src/main/java/com/datastax/driver/core/QueryOptions.java b/driver-core/src/main/java/com/datastax/driver/core/QueryOptions.java index 9fcd6a437d4..137c56aa747 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/QueryOptions.java +++ b/driver-core/src/main/java/com/datastax/driver/core/QueryOptions.java @@ -72,6 +72,8 @@ public class QueryOptions { private volatile boolean schemaQueriesPaged = true; + private volatile boolean addOriginalContactsToReconnectionPlan = false; + /** * Creates a new {@link QueryOptions} instance using the {@link #DEFAULT_CONSISTENCY_LEVEL}, * {@link #DEFAULT_SERIAL_CONSISTENCY_LEVEL} and {@link #DEFAULT_FETCH_SIZE}. @@ -499,6 +501,26 @@ public int getMaxPendingRefreshNodeRequests() { return maxPendingRefreshNodeRequests; } + /** + * Whether the driver should use original contact points when reconnecting to a control node. In + * practice this forces driver to manually add original contact points to the end of the query + * plan. It is possible that it may introduce duplicates (but under differnet Host class + * instances) in the query plan. If this is set to false it does not mean that original contact + * points will be excluded. + * + *

One use case of this feature is that if the original contact point is defined by hostname + * and its IP address changes then setting this to {@code true} allows trying reconnecting to the + * new IP if all connection was lost. + */ + public QueryOptions setAddOriginalContactsToReconnectionPlan(boolean enabled) { + this.addOriginalContactsToReconnectionPlan = enabled; + return this; + } + + public boolean shouldAddOriginalContactsToReconnectionPlan() { + return this.addOriginalContactsToReconnectionPlan; + } + @Override public boolean equals(Object that) { if (that == null || !(that instanceof QueryOptions)) { diff --git a/driver-core/src/test/java/com/datastax/driver/core/CCMBridge.java b/driver-core/src/test/java/com/datastax/driver/core/CCMBridge.java index 4c9ba61fc57..dc9b807c023 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/CCMBridge.java +++ b/driver-core/src/test/java/com/datastax/driver/core/CCMBridge.java @@ -949,6 +949,7 @@ public static class Builder { private static final Pattern RANDOM_PORT_PATTERN = Pattern.compile(RANDOM_PORT); private String ipPrefix = TestUtils.IP_PREFIX; + private String providedClusterName = null; int[] nodes = {1}; private int[] jmxPorts = {}; private boolean start = true; @@ -991,6 +992,15 @@ public Builder withSniProxy() { return this; } + /** + * Builder takes care of naming and numbering clusters on its own. Use if you really need a + * specific name + */ + public Builder withClusterName(String clusterName) { + this.providedClusterName = clusterName; + return this; + } + /** Enables SSL encryption. */ public Builder withSSL() { cassandraConfiguration.put("client_encryption_options.enabled", "true"); @@ -1115,6 +1125,8 @@ public CCMBridge build() { // be careful NOT to alter internal state (hashCode/equals) during build! String clusterName = TestUtils.generateIdentifier("ccm_"); + if (providedClusterName != null) clusterName = providedClusterName; + VersionNumber dseVersion; VersionNumber cassandraVersion; boolean versionConfigured = this.version != null; diff --git a/driver-core/src/test/java/com/datastax/driver/core/DnsEndpointTests.java b/driver-core/src/test/java/com/datastax/driver/core/DnsEndpointTests.java new file mode 100644 index 00000000000..150f7a3a2dc --- /dev/null +++ b/driver-core/src/test/java/com/datastax/driver/core/DnsEndpointTests.java @@ -0,0 +1,105 @@ +package com.datastax.driver.core; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.testng.Assert.assertTrue; + +import java.net.InetSocketAddress; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.Test; + +public class DnsEndpointTests { + + private static final Logger logger = LoggerFactory.getLogger(DnsEndpointTests.class); + + @Test(groups = "long") + public void replace_cluster_test() { + // Configure host resolution + MappedHostResolverProvider.addResolverEntry("control.reconnect.test", "127.1.1.1"); + + Cluster cluster = null; + Session session = null; + CCMBridge bridgeA = null; + try { + bridgeA = + CCMBridge.builder() + .withNodes(1) + .withIpPrefix("127.1.1.") + .withBinaryPort(9042) + .withClusterName("same_name") + .build(); + bridgeA.start(); + + cluster = + Cluster.builder() + .addContactPointsWithPorts( + InetSocketAddress.createUnresolved("control.reconnect.test", 9042)) + .withPort(9042) + .withoutAdvancedShardAwareness() + .withQueryOptions(new QueryOptions().setAddOriginalContactsToReconnectionPlan(true)) + .build(); + session = cluster.connect(); + + ResultSet rs = session.execute("select * from system.local"); + Row row = rs.one(); + String address = row.getInet("broadcast_address").toString(); + logger.info("Queried node has broadcast_address: {}}", address); + System.out.flush(); + } finally { + assert bridgeA != null; + bridgeA.close(); + } + + CCMBridge bridgeB = null; + // Overwrite host resolution + MappedHostResolverProvider.removeResolverEntry("control.reconnect.test"); + MappedHostResolverProvider.addResolverEntry("control.reconnect.test", "127.2.2.1"); + try { + bridgeB = + CCMBridge.builder() + .withNodes(1) + .withIpPrefix("127.2.2.") + .withBinaryPort(9042) + .withClusterName("same_name") + .build(); + bridgeB.start(); + Thread.sleep(1000 * 92); + ResultSet rs = session.execute("select * from system.local"); + Row row = rs.one(); + String address = row.getInet("broadcast_address").toString(); + logger.info("Queried node has broadcast_address: {}}", address); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + assert bridgeB != null; + bridgeB.close(); + } + } + + @Test(groups = "long") + public void should_connect_with_mocked_hostname() { + MappedHostResolverProvider.addResolverEntry("mocked.hostname.test", "127.0.1.1"); + try (CCMBridge ccmBridge = + CCMBridge.builder().withNodes(1).withIpPrefix("127.0.1.").withBinaryPort(9042).build(); + Cluster cluster = + Cluster.builder() + .addContactPointsWithPorts( + InetSocketAddress.createUnresolved("mocked.hostname.test", 9042)) + .withPort(9042) + .withoutAdvancedShardAwareness() + .build()) { + ccmBridge.start(); + Session session = cluster.connect(); + ResultSet rs = session.execute("SELECT * FROM system.local"); + List rows = rs.all(); + assertThat(rows).hasSize(1); + Row row = rows.get(0); + assertThat(row.getInet("broadcast_address").toString()).contains("127.0.1.1"); + assertTrue( + session.getCluster().getMetadata().getAllHosts().stream() + .map(Host::toString) + .anyMatch(hostString -> hostString.contains("mocked.hostname.test"))); + } + } +} diff --git a/driver-core/src/test/java/com/datastax/driver/core/MappedHostResolverProvider.java b/driver-core/src/test/java/com/datastax/driver/core/MappedHostResolverProvider.java new file mode 100644 index 00000000000..1bc4b1884ba --- /dev/null +++ b/driver-core/src/test/java/com/datastax/driver/core/MappedHostResolverProvider.java @@ -0,0 +1,34 @@ +package com.datastax.driver.core; + +import org.burningwave.tools.net.DefaultHostResolver; +import org.burningwave.tools.net.HostResolutionRequestInterceptor; +import org.burningwave.tools.net.MappedHostResolver; + +public class MappedHostResolverProvider { + private static volatile MappedHostResolver resolver = null; + + private MappedHostResolverProvider() {} + + public static synchronized boolean setResolver(MappedHostResolver newResolver) { + if (resolver != null) { + return false; + } + resolver = newResolver; + HostResolutionRequestInterceptor.INSTANCE.install(resolver, DefaultHostResolver.INSTANCE); + return true; + } + + public static synchronized void addResolverEntry(String hostname, String address) { + if (resolver == null) { + setResolver(new MappedHostResolver()); + } + resolver.putHost(hostname, address); + } + + public static synchronized void removeResolverEntry(String hostname) { + if (resolver == null) { + return; + } + resolver.removeHost(hostname); + } +} diff --git a/driver-core/src/test/resources/burningwave.static.properties b/driver-core/src/test/resources/burningwave.static.properties new file mode 100644 index 00000000000..7108b42c0fb --- /dev/null +++ b/driver-core/src/test/resources/burningwave.static.properties @@ -0,0 +1,4 @@ +managed-logger.repository=autodetect +managed-logger.repository.enabled=false +banner.hide=true +priority-of-this-configuration=1000 \ No newline at end of file diff --git a/pom.xml b/pom.xml index 838c65b2680..d0defd6cabe 100644 --- a/pom.xml +++ b/pom.xml @@ -86,6 +86,7 @@ 1.1.2 1.2.13 3.0.8 + 0.26.2 127.0.1. unit @@ -398,6 +399,12 @@ ${groovy.version} + + org.burningwave + tools + ${burningwave.tools.version} + +