From f324650dadde821f5c748b30b4327c227243d9d4 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 14 Jun 2018 15:04:07 +0200 Subject: [PATCH 1/5] Ensure we don't use a remote profile if cluster name matches If we are running into a race condition between a node being configured to be a remote node for cross cluster search etc. and that node joining the cluster we might connect to that node with a remote profile. If that node now joins the cluster it connected to it as a CCS remote node we use the wrong profile and can't use bulk connections etc. anymore. This change uses the remote profile only if we connect to a node that has a different cluster name than the local cluster. This is not a perfect fix for this situation but is the safe option while potentially only loose a small optimization of using less connections per node which is small anyways since we only connect to a small set of nodes. Closes #29321 --- .../transport/RemoteClusterConnection.java | 25 ++++++++++++++----- 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 82b921bd233b0..a646f7c12a0c3 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -87,6 +87,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo private volatile boolean skipUnavailable; private final ConnectHandler connectHandler; private SetOnce remoteClusterName = new SetOnce<>(); + private final ClusterName localClusterName; /** * Creates a new {@link RemoteClusterConnection} @@ -100,6 +101,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo RemoteClusterConnection(Settings settings, String clusterAlias, List seedNodes, TransportService transportService, int maxNumRemoteConnections, Predicate nodePredicate) { super(settings); + this.localClusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); this.transportService = transportService; this.maxNumRemoteConnections = maxNumRemoteConnections; this.nodePredicate = nodePredicate; @@ -310,6 +312,21 @@ public boolean isClosed() { return connectHandler.isClosed(); } + private ConnectionProfile getRemoteProfile(ClusterName name) { + // we can only compare the cluster name to make a decision if we should use a remote profile + // we can't use a cluster UUID here since we could be connecting to that remote cluster before + // the remote node has joined it cluster and have a cluster UUID. The fact that we just loose a + // rather smallish optimization on the connection layer under certain situations were remote clusters + // have the same name as the local one is minor here. + // the alternative here is to complicate the remote infrastructure to also wait until we formed a cluster, + // gained a cluster UUID and then start connecting etc. we rather use this simplification in order to maintain simplicity + if (this.localClusterName.equals(name)) { + return null; + } else { + return remoteProfile; + } + } + /** * The connect handler manages node discovery and the actual connect to the remote cluster. * There is at most one connect job running at any time. If such a connect job is triggered @@ -419,7 +436,6 @@ protected void doRun() { collectRemoteNodes(seedNodes.iterator(), transportService, listener); } }); - } void collectRemoteNodes(Iterator seedNodes, @@ -444,10 +460,6 @@ void collectRemoteNodes(Iterator seedNodes, "cluster name {}", connection.getNode(), remoteClusterName.get()), ex); throw ex; } - if (nodePredicate.test(handshakeNode) && connectedNodes.size() < maxNumRemoteConnections) { - transportService.connectToNode(handshakeNode, remoteProfile); - connectedNodes.add(handshakeNode); - } ClusterStateRequest request = new ClusterStateRequest(); request.clear(); request.nodes(true); @@ -552,7 +564,8 @@ public void handleResponse(ClusterStateResponse response) { for (DiscoveryNode node : nodesIter) { if (nodePredicate.test(node) && connectedNodes.size() < maxNumRemoteConnections) { try { - transportService.connectToNode(node, remoteProfile); // noop if node is connected + transportService.connectToNode(node, getRemoteProfile(remoteClusterName.get())); // noop if node is + // connected connectedNodes.add(node); } catch (ConnectTransportException | IllegalStateException ex) { // ISE if we fail the handshake with an version incompatible node From 72812be7bbe2344c9afbd6bea97190154ef1f036 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 14 Jun 2018 15:41:31 +0200 Subject: [PATCH 2/5] fix test --- .../transport/RemoteClusterConnection.java | 14 ++++++++++++-- .../transport/TransportService.java | 18 +++++++++++++----- 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index a646f7c12a0c3..a4f4f9b83f9eb 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -447,19 +447,29 @@ void collectRemoteNodes(Iterator seedNodes, if (seedNodes.hasNext()) { cancellableThreads.executeIO(() -> { final DiscoveryNode seedNode = seedNodes.next(); - final DiscoveryNode handshakeNode; + final TransportService.HandshakeResponse handshakeResponse; Transport.Connection connection = transportService.openConnection(seedNode, ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null)); boolean success = false; try { try { - handshakeNode = transportService.handshake(connection, remoteProfile.getHandshakeTimeout().millis(), + handshakeResponse = transportService.handshake(connection, remoteProfile.getHandshakeTimeout().millis(), (c) -> remoteClusterName.get() == null ? true : c.equals(remoteClusterName.get())); } catch (IllegalStateException ex) { logger.warn(() -> new ParameterizedMessage("seed node {} cluster name mismatch expected " + "cluster name {}", connection.getNode(), remoteClusterName.get()), ex); throw ex; } + + final DiscoveryNode handshakeNode = handshakeResponse.getDiscoveryNode(); + if (nodePredicate.test(handshakeNode) && connectedNodes.size() < maxNumRemoteConnections) { + transportService.connectToNode(handshakeNode, getRemoteProfile(handshakeResponse.getClusterName())); + if (remoteClusterName.get() == null) { + assert handshakeResponse.getClusterName().value() != null; + remoteClusterName.set(handshakeResponse.getClusterName()); + } + connectedNodes.add(handshakeNode); + } ClusterStateRequest request = new ClusterStateRequest(); request.clear(); request.nodes(true); diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 9755898be5fef..656d8c3841769 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -342,7 +342,7 @@ public void connectToNode(final DiscoveryNode node, ConnectionProfile connection } transport.connectToNode(node, connectionProfile, (newConnection, actualProfile) -> { // We don't validate cluster names to allow for CCS connections. - final DiscoveryNode remote = handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true); + final DiscoveryNode remote = handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true).discoveryNode; if (validateConnections && node.equals(remote) == false) { throw new ConnectTransportException(node, "handshake failed. unexpected remote node " + remote); } @@ -378,7 +378,7 @@ public Transport.Connection openConnection(final DiscoveryNode node, ConnectionP public DiscoveryNode handshake( final Transport.Connection connection, final long handshakeTimeout) throws ConnectTransportException { - return handshake(connection, handshakeTimeout, clusterName::equals); + return handshake(connection, handshakeTimeout, clusterName::equals).discoveryNode; } /** @@ -390,11 +390,11 @@ public DiscoveryNode handshake( * @param connection the connection to a specific node * @param handshakeTimeout handshake timeout * @param clusterNamePredicate cluster name validation predicate - * @return the connected node + * @return the handshake response * @throws ConnectTransportException if the connection failed * @throws IllegalStateException if the handshake failed */ - public DiscoveryNode handshake( + public HandshakeResponse handshake( final Transport.Connection connection, final long handshakeTimeout, Predicate clusterNamePredicate) throws ConnectTransportException { final HandshakeResponse response; @@ -420,7 +420,7 @@ public HandshakeResponse newInstance() { throw new IllegalStateException("handshake failed, incompatible version [" + response.version + "] - " + node); } - return response.discoveryNode; + return response; } static class HandshakeRequest extends TransportRequest { @@ -461,6 +461,14 @@ public void writeTo(StreamOutput out) throws IOException { clusterName.writeTo(out); Version.writeVersion(version, out); } + + public DiscoveryNode getDiscoveryNode() { + return discoveryNode; + } + + public ClusterName getClusterName() { + return clusterName; + } } public void disconnectFromNode(DiscoveryNode node) { From f85b32e2ec6276df56243a40d24163ee23ed3426 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 14 Jun 2018 16:25:27 +0200 Subject: [PATCH 3/5] fix nits --- .../org/elasticsearch/transport/RemoteClusterConnection.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index a4f4f9b83f9eb..83546d3eb4bfb 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -315,7 +315,7 @@ public boolean isClosed() { private ConnectionProfile getRemoteProfile(ClusterName name) { // we can only compare the cluster name to make a decision if we should use a remote profile // we can't use a cluster UUID here since we could be connecting to that remote cluster before - // the remote node has joined it cluster and have a cluster UUID. The fact that we just loose a + // the remote node has joined its cluster and have a cluster UUID. The fact that we just lose a // rather smallish optimization on the connection layer under certain situations were remote clusters // have the same name as the local one is minor here. // the alternative here is to complicate the remote infrastructure to also wait until we formed a cluster, From e40bdd8c1ce4b616f3123b9ab81b8976ba6e0065 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 14 Jun 2018 16:26:26 +0200 Subject: [PATCH 4/5] fix nits --- .../org/elasticsearch/transport/RemoteClusterConnection.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 83546d3eb4bfb..e37f46c5517db 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -316,7 +316,7 @@ private ConnectionProfile getRemoteProfile(ClusterName name) { // we can only compare the cluster name to make a decision if we should use a remote profile // we can't use a cluster UUID here since we could be connecting to that remote cluster before // the remote node has joined its cluster and have a cluster UUID. The fact that we just lose a - // rather smallish optimization on the connection layer under certain situations were remote clusters + // rather smallish optimization on the connection layer under certain situations where remote clusters // have the same name as the local one is minor here. // the alternative here is to complicate the remote infrastructure to also wait until we formed a cluster, // gained a cluster UUID and then start connecting etc. we rather use this simplification in order to maintain simplicity From 8fb8dd9ee02d50833fb9857a78da807484f3f6dd Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 14 Jun 2018 17:20:10 +0200 Subject: [PATCH 5/5] add test and fix MockTcpTransport to maintain supported channels --- .../RemoteClusterConnectionTests.java | 96 +++++++++++++++++++ .../transport/MockTcpTransport.java | 17 +++- 2 files changed, 112 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index ac6f99351e46d..637b8fb26a880 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -142,6 +142,102 @@ public static MockTransportService startTransport( } } + public void testLocalProfileIsUsedForLocalCluster() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); + MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode(); + knownNodes.add(seedTransport.getLocalDiscoNode()); + knownNodes.add(discoverableTransport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + updateSeedNodes(connection, Arrays.asList(seedNode)); + assertTrue(service.nodeConnected(seedNode)); + assertTrue(service.nodeConnected(discoverableNode)); + assertTrue(connection.assertNoRunningConnections()); + PlainTransportFuture futureHandler = new PlainTransportFuture<>( + new FutureTransportResponseHandler() { + @Override + public ClusterSearchShardsResponse read(StreamInput in) throws IOException { + ClusterSearchShardsResponse inst = new ClusterSearchShardsResponse(); + inst.readFrom(in); + return inst; + } + }); + TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK) + .build(); + service.sendRequest(connection.getConnection(), ClusterSearchShardsAction.NAME, new ClusterSearchShardsRequest(), + options, futureHandler); + futureHandler.txGet(); + } + } + } + } + + public void testRemoteProfileIsUsedForRemoteCluster() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT, threadPool, + Settings.builder().put("cluster.name", "foobar").build()); + MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT, + threadPool, Settings.builder().put("cluster.name", "foobar").build())) { + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode(); + knownNodes.add(seedTransport.getLocalDiscoNode()); + knownNodes.add(discoverableTransport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + updateSeedNodes(connection, Arrays.asList(seedNode)); + assertTrue(service.nodeConnected(seedNode)); + assertTrue(service.nodeConnected(discoverableNode)); + assertTrue(connection.assertNoRunningConnections()); + PlainTransportFuture futureHandler = new PlainTransportFuture<>( + new FutureTransportResponseHandler() { + @Override + public ClusterSearchShardsResponse read(StreamInput in) throws IOException { + ClusterSearchShardsResponse inst = new ClusterSearchShardsResponse(); + inst.readFrom(in); + return inst; + } + }); + TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK) + .build(); + IllegalStateException ise = (IllegalStateException) expectThrows(SendRequestTransportException.class, () -> { + service.sendRequest(discoverableNode, + ClusterSearchShardsAction.NAME, new ClusterSearchShardsRequest(), options, futureHandler); + futureHandler.txGet(); + }).getCause(); + assertEquals(ise.getMessage(), "can't select channel size is 0 for types: [RECOVERY, BULK, STATE]"); + + PlainTransportFuture handler = new PlainTransportFuture<>( + new FutureTransportResponseHandler() { + @Override + public ClusterSearchShardsResponse read(StreamInput in) throws IOException { + ClusterSearchShardsResponse inst = new ClusterSearchShardsResponse(); + inst.readFrom(in); + return inst; + } + }); + TransportRequestOptions ops = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.REG) + .build(); + service.sendRequest(connection.getConnection(), ClusterSearchShardsAction.NAME, new ClusterSearchShardsRequest(), + ops, handler); + handler.txGet(); + } + } + } + } + public void testDiscoverSingleNode() throws Exception { List knownNodes = new CopyOnWriteArrayList<>(); try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index 37bf95d0b153a..8831c46c01136 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -191,7 +191,22 @@ protected MockChannel initiateChannel(InetSocketAddress address, ActionListener< @Override protected ConnectionProfile resolveConnectionProfile(ConnectionProfile connectionProfile) { ConnectionProfile connectionProfile1 = resolveConnectionProfile(connectionProfile, defaultConnectionProfile); - ConnectionProfile.Builder builder = new ConnectionProfile.Builder(LIGHT_PROFILE); + ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); + Set allTypesWithConnection = new HashSet<>(); + Set allTypesWithoutConnection = new HashSet<>(); + for (ConnectionProfile.ConnectionTypeHandle handle : connectionProfile1.getHandles()) { + Set types = handle.getTypes(); + if (handle.length > 0) { + allTypesWithConnection.addAll(types); + } else { + allTypesWithoutConnection.addAll(types); + } + } + // make sure we maintain at least the types that are supported by this profile even if we only use a single channel for them. + builder.addConnections(1, allTypesWithConnection.toArray(new TransportRequestOptions.Type[0])); + if (allTypesWithoutConnection.isEmpty() == false) { + builder.addConnections(0, allTypesWithoutConnection.toArray(new TransportRequestOptions.Type[0])); + } builder.setHandshakeTimeout(connectionProfile1.getHandshakeTimeout()); builder.setConnectTimeout(connectionProfile1.getConnectTimeout()); return builder.build();