From 31b7ae879e925eb50b6e413031a5b5b16b04c2aa Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 19 Jun 2018 12:04:04 +0200 Subject: [PATCH 1/2] Only connect to formed remote clusters This change prevent remote cluster connections to be established to nodes that have not yet joined a cluster and don't have a cluster UUID. This allows to effectivly detect nodes that are part of the local cluster. To compare the local cluster UUID to the remote nodes cluster UUID we need to wait until we recovered a state and a master is elected before we can connect to remote clusters. Relates to #31331 --- .../action/main/TransportMainAction.java | 1 - .../java/org/elasticsearch/node/Node.java | 80 ++++++++++-- .../transport/RemoteClusterConnection.java | 114 ++++++++++++++---- .../transport/RemoteClusterService.java | 35 +++--- .../transport/TransportService.java | 5 - .../RemoteClusterConnectionTests.java | 84 +++++++++---- .../transport/RemoteClusterServiceTests.java | 49 ++++---- .../transport/MockTcpTransport.java | 1 - 8 files changed, 261 insertions(+), 108 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/main/TransportMainAction.java b/server/src/main/java/org/elasticsearch/action/main/TransportMainAction.java index d560a7ecc11d3..5020c7dc63a32 100644 --- a/server/src/main/java/org/elasticsearch/action/main/TransportMainAction.java +++ b/server/src/main/java/org/elasticsearch/action/main/TransportMainAction.java @@ -50,7 +50,6 @@ public TransportMainAction(Settings settings, ThreadPool threadPool, TransportSe protected void doExecute(MainRequest request, ActionListener listener) { ClusterState clusterState = clusterService.state(); assert Node.NODE_NAME_SETTING.exists(settings); - final boolean available = clusterState.getBlocks().hasGlobalBlock(RestStatus.SERVICE_UNAVAILABLE) == false; listener.onResponse( new MainResponse(Node.NODE_NAME_SETTING.get(settings), Version.CURRENT, clusterState.getClusterName(), clusterState.metaData().clusterUUID(), Build.CURRENT)); diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 4440153dd361e..4b6e622d159d4 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -20,12 +20,14 @@ package org.elasticsearch.node; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.ThreadContext; import org.apache.lucene.util.Constants; import org.apache.lucene.util.SetOnce; import org.elasticsearch.Build; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionModule; import org.elasticsearch.action.Action; import org.elasticsearch.action.search.SearchExecutionStatsCollector; @@ -139,6 +141,7 @@ import org.elasticsearch.tasks.TaskResultsService; import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportInterceptor; import org.elasticsearch.transport.TransportService; @@ -165,8 +168,10 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Predicate; import java.util.function.UnaryOperator; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -678,17 +683,37 @@ public Node start() throws NodeValidationException { : "clusterService has a different local node than the factory provided"; transportService.acceptIncomingRequests(); discovery.startInitialJoin(); + final ThreadPool thread = injector.getInstance(ThreadPool.class); final TimeValue initialStateTimeout = DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings); - if (initialStateTimeout.millis() > 0) { - final ThreadPool thread = injector.getInstance(ThreadPool.class); - ClusterState clusterState = clusterService.state(); - ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, null, logger, thread.getThreadContext()); - if (clusterState.nodes().getMasterNodeId() == null) { + final boolean connectToRemote = RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings); + final boolean waitForState = initialStateTimeout.millis() > 0; + Predicate connectRemoteClusterPredicate = state -> "_na_".equals(state.metaData().clusterUUID()) == false; + ClusterState clusterState = clusterService.state(); + AtomicBoolean connectRemoteClusters = new AtomicBoolean(connectToRemote); + if (waitForState) { + CountDownLatch latch = new CountDownLatch(1); + Predicate clusterStatePredicate = state -> state.nodes().getMasterNodeId() != null; + final Consumer consumer; + if (connectToRemote) { + clusterStatePredicate = clusterStatePredicate.and(connectRemoteClusterPredicate); + connectRemoteClusters.set(false); + consumer = c -> transportService.getRemoteClusterService().initializeRemoteClusters(c.metaData().clusterUUID(), + c.metaData().settings(), ActionListener.wrap(v -> latch.countDown(), e -> { + latch.countDown(); + logger.warn("Failed to connect to remote clusters", e); + })); + } else { + consumer = c -> latch.countDown(); + } + if (clusterStatePredicate.test(clusterState) == false) { logger.debug("waiting to join the cluster. timeout [{}]", initialStateTimeout); - final CountDownLatch latch = new CountDownLatch(1); + ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, initialStateTimeout, logger, + thread.getThreadContext()); observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override - public void onNewClusterState(ClusterState state) { latch.countDown(); } + public void onNewClusterState(ClusterState state) { + consumer.accept(state); + } @Override public void onClusterServiceClose() { @@ -701,13 +726,42 @@ public void onTimeout(TimeValue timeout) { initialStateTimeout); latch.countDown(); } - }, state -> state.nodes().getMasterNodeId() != null, initialStateTimeout); + }, clusterStatePredicate); + } else { + consumer.accept(clusterState); + } + try { + latch.await(); + } catch (InterruptedException e) { + throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state"); + } + } + if (connectRemoteClusters.get()) { + Consumer consumer = state -> transportService.getRemoteClusterService().initializeRemoteClusters( + state.metaData().clusterUUID(), state.metaData().settings(), ActionListener.wrap(v -> {}, + e -> logger.warn("Failed to connect to remote clusters", e))); + if (connectRemoteClusterPredicate.test(clusterState) == false) { + ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, null, logger, + thread.getThreadContext()); + // + observer.waitForNextChange(new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState state) { + consumer.accept(state); + } - try { - latch.await(); - } catch (InterruptedException e) { - throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state"); - } + @Override + public void onClusterServiceClose() { + } + + @Override + public void onTimeout(TimeValue timeout) { + assert false; + } + }, connectRemoteClusterPredicate); + + } else { + consumer.accept(clusterState); } } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index c86ea61980a87..8cc35ce1c3cca 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -29,10 +29,14 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.main.MainAction; +import org.elasticsearch.action.main.MainRequest; +import org.elasticsearch.action.main.MainResponse; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; @@ -50,6 +54,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -83,11 +88,11 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo private final String clusterAlias; private final int maxNumRemoteConnections; private final Predicate nodePredicate; + private final String localClusterUUID; private volatile List seedNodes; private volatile boolean skipUnavailable; private final ConnectHandler connectHandler; - private SetOnce remoteClusterName = new SetOnce<>(); - private final ClusterName localClusterName; + private final SetOnce remoteClusterAndUUID = new SetOnce<>(); /** * Creates a new {@link RemoteClusterConnection} @@ -99,10 +104,15 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo * @param nodePredicate a predicate to filter eligible remote nodes to connect to */ RemoteClusterConnection(Settings settings, String clusterAlias, List seedNodes, - TransportService transportService, int maxNumRemoteConnections, Predicate nodePredicate) { + TransportService transportService, int maxNumRemoteConnections, Predicate nodePredicate, + String localClusterUUID) { super(settings); - this.localClusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); this.transportService = transportService; + if ("_na_".equals(localClusterUUID)) { + throw new IllegalArgumentException("invalid local clusterstate UUID: " + localClusterUUID); + } + this.localClusterUUID = Objects.requireNonNull(localClusterUUID); + this.maxNumRemoteConnections = maxNumRemoteConnections; this.nodePredicate = nodePredicate; this.clusterAlias = clusterAlias; @@ -312,7 +322,7 @@ public boolean isClosed() { return connectHandler.isClosed(); } - private ConnectionProfile getRemoteProfile(ClusterName name) { + private ConnectionProfile getRemoteProfile() { // we can only compare the cluster name to make a decision if we should use a remote profile // we can't use a cluster UUID here since we could be connecting to that remote cluster before // the remote node has joined its cluster and have a cluster UUID. The fact that we just lose a @@ -320,7 +330,7 @@ private ConnectionProfile getRemoteProfile(ClusterName name) { // have the same name as the local one is minor here. // the alternative here is to complicate the remote infrastructure to also wait until we formed a cluster, // gained a cluster UUID and then start connecting etc. we rather use this simplification in order to maintain simplicity - if (this.localClusterName.equals(name)) { + if (remoteClusterAndUUID.get() != null && this.localClusterUUID.equals(remoteClusterAndUUID.get().clusterUUID)) { return null; } else { return remoteProfile; @@ -438,6 +448,67 @@ protected void doRun() { }); } + Transport.Connection findFirstReadyNode(Iterator seedNodes) throws IOException, InterruptedException { + boolean remoteClusterHasNotFormed = false; + while (seedNodes.hasNext()) { + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException("remote connect thread got interrupted"); + } + final DiscoveryNode seedNode = seedNodes.next(); + final MainResponse mainResponse; + boolean success = false; + Transport.Connection connection = transportService.openConnection(seedNode, + ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null)); + ThreadPool threadPool = transportService.getThreadPool(); + ThreadContext threadContext = threadPool.getThreadContext(); + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + // we stash any context here since this is an internal execution and should not leak any + // existing context information. + threadContext.markAsSystemContext(); + PlainTransportFuture futureHandler = new PlainTransportFuture<>( + new FutureTransportResponseHandler() { + @Override + public MainResponse read(StreamInput in) throws IOException { + MainResponse response = MainAction.INSTANCE.newResponse(); + response.readFrom(in); + return response; + } + }); + TransportRequestOptions options = TransportRequestOptions.builder().withTimeout(remoteProfile + .getHandshakeTimeout().millis()).build(); + transportService.sendRequest(connection, MainAction.NAME, new MainRequest(), options, + futureHandler); + mainResponse = futureHandler.txGet(); + if ("_na_".equals(mainResponse.getClusterUuid()) == false) { + ClusterNameAndUUID clusterNameAndUUID = remoteClusterAndUUID.get(); + if (clusterNameAndUUID == null) { + remoteClusterAndUUID.set(new ClusterNameAndUUID(mainResponse.getClusterName(), + mainResponse.getClusterUuid())); + } else if (clusterNameAndUUID.clusterName.equals(mainResponse.getClusterName()) == false) { + throw new IllegalStateException("handshake failed, mismatched cluster name [" + mainResponse.getClusterName() + + "] - " + seedNode); + } else if (clusterNameAndUUID.clusterUUID.equals(mainResponse.getClusterUuid()) == false) { + throw new IllegalStateException("handshake failed, mismatched cluster UUID [" + mainResponse.getClusterUuid() + + "] - " + seedNode); + } + success = true; + return connection; + } else { + remoteClusterHasNotFormed = true; + } + } finally { + if (success == false) { + connection.close(); + } + } + } + if (remoteClusterHasNotFormed) { + throw new IllegalStateException("seed nodes have not joined a cluster yet"); + } else { + throw new IllegalStateException("no seed node left"); + } + } + void collectRemoteNodes(Iterator seedNodes, final TransportService transportService, ActionListener listener) { if (Thread.currentThread().isInterrupted()) { @@ -446,28 +517,23 @@ void collectRemoteNodes(Iterator seedNodes, try { if (seedNodes.hasNext()) { cancellableThreads.executeIO(() -> { - final DiscoveryNode seedNode = seedNodes.next(); final TransportService.HandshakeResponse handshakeResponse; - Transport.Connection connection = transportService.openConnection(seedNode, - ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null)); + Transport.Connection connection = findFirstReadyNode(seedNodes); boolean success = false; try { try { handshakeResponse = transportService.handshake(connection, remoteProfile.getHandshakeTimeout().millis(), - (c) -> remoteClusterName.get() == null ? true : c.equals(remoteClusterName.get())); + (c) -> c.equals(remoteClusterAndUUID.get().clusterName)); } catch (IllegalStateException ex) { logger.warn(() -> new ParameterizedMessage("seed node {} cluster name mismatch expected " + - "cluster name {}", connection.getNode(), remoteClusterName.get()), ex); + "cluster name {}", connection.getNode(), remoteClusterAndUUID.get()), ex); throw ex; } final DiscoveryNode handshakeNode = handshakeResponse.getDiscoveryNode(); if (nodePredicate.test(handshakeNode) && connectedNodes.size() < maxNumRemoteConnections) { - transportService.connectToNode(handshakeNode, getRemoteProfile(handshakeResponse.getClusterName())); - if (remoteClusterName.get() == null) { - assert handshakeResponse.getClusterName().value() != null; - remoteClusterName.set(handshakeResponse.getClusterName()); - } + transportService.connectToNode(handshakeNode, getRemoteProfile()); + assert handshakeResponse.getClusterName().value() != null; connectedNodes.add(handshakeNode); } ClusterStateRequest request = new ClusterStateRequest(); @@ -557,10 +623,6 @@ public ClusterStateResponse newInstance() { @Override public void handleResponse(ClusterStateResponse response) { try { - if (remoteClusterName.get() == null) { - assert response.getClusterName().value() != null; - remoteClusterName.set(response.getClusterName()); - } try (Closeable theConnection = connection) { // the connection is unused - see comment in #collectRemoteNodes // we have to close this connection before we notify listeners - this is mainly needed for test correctness // since if we do it afterwards we might fail assertions that check if all high level connections are closed. @@ -573,7 +635,7 @@ public void handleResponse(ClusterStateResponse response) { for (DiscoveryNode node : nodesIter) { if (nodePredicate.test(node) && connectedNodes.size() < maxNumRemoteConnections) { try { - transportService.connectToNode(node, getRemoteProfile(remoteClusterName.get())); // noop if node is + transportService.connectToNode(node, getRemoteProfile()); // noop if node is // connected connectedNodes.add(node); } catch (ConnectTransportException | IllegalStateException ex) { @@ -696,4 +758,14 @@ private synchronized void ensureIteratorAvailable() { } } } + + private static class ClusterNameAndUUID { + final ClusterName clusterName; + final String clusterUUID; + + private ClusterNameAndUUID(ClusterName clusterName, String clusterUUID) { + this.clusterName = clusterName; + this.clusterUUID = clusterUUID; + } + } } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index a07de63d53734..89891f6f5f0d3 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.transport; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; @@ -49,6 +50,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Function; @@ -103,6 +105,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl private final TransportService transportService; private final int numRemoteConnections; private volatile Map remoteClusters = Collections.emptyMap(); + private final SetOnce localCusterUUID = new SetOnce<>(); RemoteClusterService(Settings settings, TransportService transportService) { super(settings); @@ -116,6 +119,10 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl * @param connectionListener a listener invoked once every configured cluster has been connected to */ private synchronized void updateRemoteClusters(Map> seeds, ActionListener connectionListener) { + if (localCusterUUID.get() == null) { + connectionListener.onFailure(new IllegalStateException("RemoteClusterService is not initialized no cluster uuid set")); + return; + } if (seeds.containsKey(LOCAL_CLUSTER_GROUP_KEY)) { throw new IllegalArgumentException("remote clusters must not have the empty string as its key"); } @@ -139,7 +146,7 @@ private synchronized void updateRemoteClusters(Map> if (remote == null) { // this is a new cluster we have to add a new representation remote = new RemoteClusterConnection(settings, entry.getKey(), entry.getValue(), transportService, numRemoteConnections, - getNodePredicate(settings)); + getNodePredicate(settings), localCusterUUID.get()); remoteClusters.put(entry.getKey(), remote); } @@ -327,26 +334,18 @@ void updateRemoteCluster( updateRemoteClusters(Collections.singletonMap(clusterAlias, nodes), connectionListener); } - /** - * Connects to all remote clusters in a blocking fashion. This should be called on node startup to establish an initial connection - * to all configured seed nodes. - */ - void initializeRemoteClusters() { - final TimeValue timeValue = REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings); - final PlainActionFuture future = new PlainActionFuture<>(); - Map> seeds = RemoteClusterAware.buildRemoteClustersSeeds(settings); - updateRemoteClusters(seeds, future); - try { - future.get(timeValue.millis(), TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (TimeoutException ex) { - logger.warn("failed to connect to remote clusters within {}", timeValue.toString()); - } catch (Exception e) { - throw new IllegalStateException("failed to connect to remote clusters", e); + + public void initializeRemoteClusters(String localClusterUUID, Settings clusterSettings, ActionListener listener) { + if (localClusterUUID.equals("_na_")) { + throw new IllegalArgumentException("invalid local cluster uuid: " + localClusterUUID); } + this.localCusterUUID.set(localClusterUUID); + Map> seeds = RemoteClusterAware.buildRemoteClustersSeeds(Settings.builder().put(settings, false) + .put(clusterSettings).build()); + updateRemoteClusters(seeds, listener); } + @Override public void close() throws IOException { IOUtils.close(remoteClusters.values()); diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 656d8c3841769..6026cd51d1ee4 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -233,10 +233,6 @@ protected void doStart() { false, false, (request, channel) -> channel.sendResponse( new HandshakeResponse(localNode, clusterName, localNode.getVersion()))); - if (connectToRemoteCluster) { - // here we start to connect to the remote clusters - remoteClusterService.initializeRemoteClusters(); - } } @Override @@ -413,7 +409,6 @@ public HandshakeResponse newInstance() { } catch (Exception e) { throw new IllegalStateException("handshake failed with " + node, e); } - if (!clusterNamePredicate.test(response.clusterName)) { throw new IllegalStateException("handshake failed, mismatched cluster name [" + response.clusterName + "] - " + node); } else if (response.version.isCompatible(localNode.getVersion()) == false) { diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 637b8fb26a880..1ce3b4c4be9ad 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.transport; import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.Build; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.LatchedActionListener; @@ -29,6 +30,9 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.main.MainAction; +import org.elasticsearch.action.main.MainRequest; +import org.elasticsearch.action.main.MainResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -38,6 +42,7 @@ import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; @@ -102,6 +107,10 @@ public static MockTransportService startTransport(String id, List return startTransport(id, knownNodes, version, threadPool, Settings.EMPTY); } + private String clusterUUID() { + return randomBoolean() ? ClusterName.DEFAULT.value() : randomAlphaOfLengthBetween(1, 10); + } + public static MockTransportService startTransport( final String id, final List knownNodes, @@ -113,7 +122,7 @@ public static MockTransportService startTransport( ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(s); MockTransportService newService = MockTransportService.createNewService(s, version, threadPool, null); try { - newService.registerRequestHandler(ClusterSearchShardsAction.NAME,ThreadPool.Names.SAME, ClusterSearchShardsRequest::new, + newService.registerRequestHandler(ClusterSearchShardsAction.NAME, ThreadPool.Names.SAME, ClusterSearchShardsRequest::new, (request, channel) -> { if ("index_not_found".equals(request.preference())) { channel.sendResponse(new IndexNotFoundException("index")); @@ -122,6 +131,13 @@ public static MockTransportService startTransport( knownNodes.toArray(new DiscoveryNode[0]), Collections.emptyMap())); } }); + Writeable.Reader reader = stream -> { + MainRequest mainRequest = new MainRequest(); + mainRequest.readFrom(stream); + return mainRequest; + }; + newService.registerRequestHandler(MainAction.NAME, ThreadPool.Names.SAME, reader , (request, channel) + -> channel.sendResponse(new MainResponse("somenode", version, clusterName, clusterName.value(), Build.CURRENT))); newService.registerRequestHandler(ClusterStateAction.NAME, ThreadPool.Names.SAME, ClusterStateRequest::new, (request, channel) -> { DiscoveryNodes.Builder builder = DiscoveryNodes.builder(); @@ -142,6 +158,30 @@ public static MockTransportService startTransport( } } + public void testWontConnectUnformedCluster() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT, threadPool, + Settings.builder().put("cluster.name", "_na_").build()); + MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode(); + knownNodes.add(seedTransport.getLocalDiscoNode()); + knownNodes.add(discoverableTransport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test", + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true, clusterUUID())) { + IllegalStateException ise = expectThrows(IllegalStateException.class, + () -> updateSeedNodes(connection, Arrays.asList(seedNode))); + assertEquals("seed nodes have not joined a cluster yet", ise.getMessage()); + } + } + } + } + public void testLocalProfileIsUsedForLocalCluster() throws Exception { List knownNodes = new CopyOnWriteArrayList<>(); try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); @@ -156,7 +196,7 @@ public void testLocalProfileIsUsedForLocalCluster() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true, "elasticsearch")) { updateSeedNodes(connection, Arrays.asList(seedNode)); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); @@ -196,7 +236,7 @@ public void testRemoteProfileIsUsedForRemoteCluster() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true, clusterUUID())) { updateSeedNodes(connection, Arrays.asList(seedNode)); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); @@ -252,7 +292,7 @@ public void testDiscoverSingleNode() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true, clusterUUID())) { updateSeedNodes(connection, Arrays.asList(seedNode)); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); @@ -281,7 +321,7 @@ public void testDiscoverSingleNodeWithIncompatibleSeed() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, Integer.MAX_VALUE, n -> true)) { + seedNodes, service, Integer.MAX_VALUE, n -> true, clusterUUID())) { updateSeedNodes(connection, seedNodes); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); @@ -308,7 +348,7 @@ public void testNodeDisconnected() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true, clusterUUID())) { updateSeedNodes(connection, Arrays.asList(seedNode)); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); @@ -357,7 +397,7 @@ public void testFilterDiscoveredNodes() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> n.equals(rejectedNode) == false)) { + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> n.equals(rejectedNode) == false, clusterUUID())) { updateSeedNodes(connection, Arrays.asList(seedNode)); if (rejectedNode.equals(seedNode)) { assertFalse(service.nodeConnected(seedNode)); @@ -396,7 +436,7 @@ public void testConnectWithIncompatibleTransports() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true, clusterUUID())) { expectThrows(Exception.class, () -> updateSeedNodes(connection, Arrays.asList(seedNode))); assertFalse(service.nodeConnected(seedNode)); assertTrue(connection.assertNoRunningConnections()); @@ -450,7 +490,7 @@ public Connection getConnection(DiscoveryNode node) { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true, clusterUUID())) { connection.addConnectedNode(seedNode); for (DiscoveryNode node : knownNodes) { final Transport.Connection transportConnection = connection.getConnection(node); @@ -493,7 +533,7 @@ public void run() { CountDownLatch listenerCalled = new CountDownLatch(1); AtomicReference exceptionReference = new AtomicReference<>(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true, clusterUUID())) { ActionListener listener = ActionListener.wrap(x -> { listenerCalled.countDown(); fail("expected exception"); @@ -528,7 +568,7 @@ public void testFetchShards() throws Exception { service.acceptIncomingRequests(); List nodes = Collections.singletonList(seedNode); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - nodes, service, Integer.MAX_VALUE, n -> true)) { + nodes, service, Integer.MAX_VALUE, n -> true, clusterUUID())) { if (randomBoolean()) { updateSeedNodes(connection, nodes); } @@ -564,7 +604,7 @@ public void testFetchShardsSkipUnavailable() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Collections.singletonList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + Collections.singletonList(seedNode), service, Integer.MAX_VALUE, n -> true, clusterUUID())) { SearchRequest request = new SearchRequest("test-index"); ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index") @@ -672,7 +712,7 @@ public void testTriggerUpdatesConcurrently() throws IOException, InterruptedExce service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, Integer.MAX_VALUE, n -> true)) { + seedNodes, service, Integer.MAX_VALUE, n -> true, clusterUUID())) { int numThreads = randomIntBetween(4, 10); Thread[] threads = new Thread[numThreads]; CyclicBarrier barrier = new CyclicBarrier(numThreads); @@ -749,7 +789,7 @@ public void testCloseWhileConcurrentlyConnecting() throws IOException, Interrupt service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, Integer.MAX_VALUE, n -> true)) { + seedNodes, service, Integer.MAX_VALUE, n -> true, clusterUUID())) { int numThreads = randomIntBetween(4, 10); Thread[] threads = new Thread[numThreads]; CyclicBarrier barrier = new CyclicBarrier(numThreads + 1); @@ -832,7 +872,7 @@ public void testGetConnectionInfo() throws Exception { service.acceptIncomingRequests(); int maxNumConnections = randomIntBetween(1, 5); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, maxNumConnections, n -> true)) { + seedNodes, service, maxNumConnections, n -> true, clusterUUID())) { // test no nodes connected RemoteConnectionInfo remoteConnectionInfo = assertSerialization(connection.getConnectionInfo()); assertNotNull(remoteConnectionInfo); @@ -979,7 +1019,7 @@ public void testEnsureConnected() throws IOException, InterruptedException { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true, clusterUUID())) { assertFalse(service.nodeConnected(seedNode)); assertFalse(service.nodeConnected(discoverableNode)); assertTrue(connection.assertNoRunningConnections()); @@ -1028,7 +1068,7 @@ public void testCollectNodes() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true, clusterUUID())) { if (randomBoolean()) { updateSeedNodes(connection, Arrays.asList(seedNode)); } @@ -1076,7 +1116,7 @@ public void testConnectedNodesConcurrentAccess() throws IOException, Interrupted service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, Integer.MAX_VALUE, n -> true)) { + seedNodes, service, Integer.MAX_VALUE, n -> true, clusterUUID())) { final int numGetThreads = randomIntBetween(4, 10); final Thread[] getThreads = new Thread[numGetThreads]; final int numModifyingThreads = randomIntBetween(4, 10); @@ -1166,7 +1206,7 @@ public void testClusterNameIsChecked() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true, clusterUUID())) { updateSeedNodes(connection, Arrays.asList(seedNode)); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); @@ -1183,9 +1223,9 @@ public void testClusterNameIsChecked() throws Exception { assertTrue(connection.assertNoRunningConnections()); IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, () -> updateSeedNodes(connection, Arrays.asList(otherClusterTransport.getLocalDiscoNode()))); - assertThat(illegalStateException.getMessage(), + assertThat(illegalStateException.getMessage(), startsWith("handshake failed, mismatched cluster name [Cluster [otherCluster]]" + - " - {other_cluster_discoverable_node}")); + " - " + otherClusterTransport.getLocalDiscoNode())); } } } @@ -1239,7 +1279,7 @@ public boolean nodeConnected(DiscoveryNode node) { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Collections.singletonList(connectedNode), service, Integer.MAX_VALUE, n -> true)) { + Collections.singletonList(connectedNode), service, Integer.MAX_VALUE, n -> true, clusterUUID())) { connection.addConnectedNode(connectedNode); for (int i = 0; i < 10; i++) { //always a direct connection as the remote node is already connected diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 03d76b5a953c6..f6b8988752ca8 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.AbstractScopedSettings; import org.elasticsearch.common.settings.ClusterSettings; @@ -127,7 +128,7 @@ public void testBuiltRemoteClustersSeeds() throws Exception { } - public void testGroupClusterIndices() throws IOException { + public void testGroupClusterIndices() throws IOException, InterruptedException { List knownNodes = new CopyOnWriteArrayList<>(); try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); MockTransportService otherSeedTransport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) { @@ -137,8 +138,7 @@ public void testGroupClusterIndices() throws IOException { knownNodes.add(otherSeedTransport.getLocalDiscoNode()); Collections.shuffle(knownNodes, random()); - try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, - null)) { + try (MockTransportService transportService = startTransport("test", Collections.emptyList(), Version.CURRENT, Settings.EMPTY)) { transportService.start(); transportService.acceptIncomingRequests(); Settings.Builder builder = Settings.builder(); @@ -146,7 +146,7 @@ public void testGroupClusterIndices() throws IOException { builder.putList("search.remote.cluster_2.seeds", otherSeedNode.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); - service.initializeRemoteClusters(); + initializedRemoteClusterService(service); assertTrue(service.isCrossClusterSearchEnabled()); assertTrue(service.isRemoteClusterRegistered("cluster_1")); assertTrue(service.isRemoteClusterRegistered("cluster_2")); @@ -173,7 +173,7 @@ public void testGroupClusterIndices() throws IOException { } } - public void testIncrementallyAddClusters() throws IOException { + public void testIncrementallyAddClusters() throws IOException, InterruptedException { List knownNodes = new CopyOnWriteArrayList<>(); try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); MockTransportService otherSeedTransport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) { @@ -192,7 +192,7 @@ public void testIncrementallyAddClusters() throws IOException { builder.putList("search.remote.cluster_2.seeds", otherSeedNode.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(Settings.EMPTY, transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); - service.initializeRemoteClusters(); + initializedRemoteClusterService(service); assertFalse(service.isCrossClusterSearchEnabled()); service.updateRemoteCluster("cluster_1", Collections.singletonList(seedNode.getAddress().address())); assertTrue(service.isCrossClusterSearchEnabled()); @@ -234,11 +234,7 @@ public void testRemoteNodeAttribute() throws IOException, InterruptedException { knownNodes.add(c2N2Node); Collections.shuffle(knownNodes, random()); - try (MockTransportService transportService = MockTransportService.createNewService( - settings, - Version.CURRENT, - threadPool, - null)) { + try (MockTransportService transportService = startTransport("test", Collections.emptyList(), Version.CURRENT, settings)) { transportService.start(); transportService.acceptIncomingRequests(); final Settings.Builder builder = Settings.builder(); @@ -249,7 +245,7 @@ public void testRemoteNodeAttribute() throws IOException, InterruptedException { try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); - service.initializeRemoteClusters(); + initializedRemoteClusterService(service); assertFalse(service.isCrossClusterSearchEnabled()); final InetSocketAddress c1N1Address = c1N1Node.getAddress().address(); @@ -306,11 +302,7 @@ public void testRemoteNodeRoles() throws IOException, InterruptedException { knownNodes.add(c2N2Node); Collections.shuffle(knownNodes, random()); - try (MockTransportService transportService = MockTransportService.createNewService( - settings, - Version.CURRENT, - threadPool, - null)) { + try (MockTransportService transportService = startTransport("test", Collections.emptyList(), Version.CURRENT, settings)) { transportService.start(); transportService.acceptIncomingRequests(); final Settings.Builder builder = Settings.builder(); @@ -318,7 +310,7 @@ public void testRemoteNodeRoles() throws IOException, InterruptedException { builder.putList("search.remote.cluster_2.seeds", c2N1Node.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); - service.initializeRemoteClusters(); + initializedRemoteClusterService(service); assertFalse(service.isCrossClusterSearchEnabled()); final InetSocketAddress c1N1Address = c1N1Node.getAddress().address(); @@ -353,7 +345,7 @@ public void testRemoteNodeRoles() throws IOException, InterruptedException { } private ActionListener connectionListener(final CountDownLatch latch) { - return ActionListener.wrap(x -> latch.countDown(), x -> fail()); + return ActionListener.wrap(x -> latch.countDown(), x -> {latch.countDown(); throw new AssertionError(x);}); } public void testCollectNodes() throws InterruptedException, IOException { @@ -380,11 +372,7 @@ public void testCollectNodes() throws InterruptedException, IOException { Collections.shuffle(knownNodes_c1, random()); Collections.shuffle(knownNodes_c2, random()); - try (MockTransportService transportService = MockTransportService.createNewService( - settings, - Version.CURRENT, - threadPool, - null)) { + try (MockTransportService transportService = startTransport("test", Collections.emptyList(), Version.CURRENT, settings)) { transportService.start(); transportService.acceptIncomingRequests(); final Settings.Builder builder = Settings.builder(); @@ -395,7 +383,7 @@ public void testCollectNodes() throws InterruptedException, IOException { try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); - service.initializeRemoteClusters(); + initializedRemoteClusterService(service); assertFalse(service.isCrossClusterSearchEnabled()); final InetSocketAddress c1N1Address = c1N1Node.getAddress().address(); @@ -534,12 +522,12 @@ public void testCollectSearchShards() throws Exception { Settings settings = builder.build(); try { - try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { + try (MockTransportService service = startTransport("test", Collections.emptyList(), Version.CURRENT, Settings.EMPTY)) { service.start(); service.acceptIncomingRequests(); try (RemoteClusterService remoteClusterService = new RemoteClusterService(settings, service)) { assertFalse(remoteClusterService.isCrossClusterSearchEnabled()); - remoteClusterService.initializeRemoteClusters(); + initializedRemoteClusterService(remoteClusterService); assertTrue(remoteClusterService.isCrossClusterSearchEnabled()); { final CountDownLatch latch = new CountDownLatch(1); @@ -678,6 +666,13 @@ public void onNodeDisconnected(DiscoveryNode node) { } } + private void initializedRemoteClusterService(RemoteClusterService remoteClusterService) throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + remoteClusterService.initializeRemoteClusters(randomBoolean() ? ClusterName.DEFAULT.value() : + randomRealisticUnicodeOfLengthBetween(1, 5), Settings.EMPTY, connectionListener(latch)); + latch.await(); + } + public void testRemoteClusterSkipIfDisconnectedSetting() { { Settings settings = Settings.builder() diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index 8831c46c01136..02ae1c8307b06 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -172,7 +172,6 @@ protected MockChannel initiateChannel(InetSocketAddress address, ActionListener< if (success == false) { IOUtils.close(socket); } - } executor.submit(() -> { From d9b8865c9a0968c8b8644b62a7f75a7266d54392 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 19 Jun 2018 12:41:31 +0200 Subject: [PATCH 2/2] fix imports --- .../org/elasticsearch/action/main/TransportMainAction.java | 1 - server/src/main/java/org/elasticsearch/node/Node.java | 1 - .../org/elasticsearch/transport/RemoteClusterService.java | 4 ---- 3 files changed, 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/main/TransportMainAction.java b/server/src/main/java/org/elasticsearch/action/main/TransportMainAction.java index 5020c7dc63a32..2b81645eb5c49 100644 --- a/server/src/main/java/org/elasticsearch/action/main/TransportMainAction.java +++ b/server/src/main/java/org/elasticsearch/action/main/TransportMainAction.java @@ -30,7 +30,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.node.Node; -import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 4b6e622d159d4..31e9370eb7278 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -20,7 +20,6 @@ package org.elasticsearch.node; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.ThreadContext; import org.apache.lucene.util.Constants; import org.apache.lucene.util.SetOnce; import org.elasticsearch.Build; diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 89891f6f5f0d3..c64a220afcc5f 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -25,7 +25,6 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Booleans; @@ -48,9 +47,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Function;