From 8cb37a4955eb5a79985e4944e5fa933c4c1daed4 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 12 Apr 2018 16:26:18 +0200 Subject: [PATCH 1/8] Add remote cluster client This change adds a client that is connected to a remote cluster. This allows plugins and internal structures to invoke actions on remote clusters just like a if it's a local cluster. The remote cluster must be configured via the cross cluster search infrastructure. --- .../java/org/elasticsearch/client/Client.java | 8 ++ .../elasticsearch/client/FilterClient.java | 5 ++ .../elasticsearch/client/node/NodeClient.java | 13 ++- .../java/org/elasticsearch/node/Node.java | 2 +- .../transport/RemoteClusterAwareClient.java | 65 ++++++++++++++ .../transport/RemoteClusterService.java | 14 +++ .../client/node/NodeClientHeadersTests.java | 2 +- .../transport/RemoteClusterClientTests.java | 90 +++++++++++++++++++ .../RemoteClusterConnectionTests.java | 2 +- 9 files changed, 197 insertions(+), 4 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java create mode 100644 server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java diff --git a/server/src/main/java/org/elasticsearch/client/Client.java b/server/src/main/java/org/elasticsearch/client/Client.java index 2c61653f61ce0..aecbc4a3c2e78 100644 --- a/server/src/main/java/org/elasticsearch/client/Client.java +++ b/server/src/main/java/org/elasticsearch/client/Client.java @@ -477,4 +477,12 @@ public interface Client extends ElasticsearchClient, Releasable { * issued from it. */ Client filterWithHeader(Map headers); + + /** + * Returns a client to a remote cluster with the given cluster alias. + * This method is optinoal and might throw {@link UnsupportedOperationException} + */ + default Client getRemoteClusterClient(String clusterAlias) { + throw new UnsupportedOperationException("this client doesn't support remote cluster connections"); + } } diff --git a/server/src/main/java/org/elasticsearch/client/FilterClient.java b/server/src/main/java/org/elasticsearch/client/FilterClient.java index 23d3c2c3d0c2f..92f6817b74b67 100644 --- a/server/src/main/java/org/elasticsearch/client/FilterClient.java +++ b/server/src/main/java/org/elasticsearch/client/FilterClient.java @@ -73,4 +73,9 @@ protected localNodeId; + private RemoteClusterService remoteClusterService; public NodeClient(Settings settings, ThreadPool threadPool) { super(settings, threadPool); } - public void initialize(Map actions, Supplier localNodeId) { + public void initialize(Map actions, Supplier localNodeId, + RemoteClusterService remoteClusterService) { this.actions = actions; this.localNodeId = localNodeId; + this.remoteClusterService = remoteClusterService; } @Override @@ -117,4 +123,9 @@ > TransportAction transportAction(GenericAction resourcesToClose.addAll(pluginLifecycleComponents); this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents); client.initialize(injector.getInstance(new Key>() {}), - () -> clusterService.localNode().getId()); + () -> clusterService.localNode().getId(), transportService.getRemoteClusterService()); if (NetworkModule.HTTP_ENABLED.get(settings)) { logger.debug("initializing HTTP handlers ..."); diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java new file mode 100644 index 0000000000000..f8f78288a8995 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java @@ -0,0 +1,65 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.transport; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.support.AbstractClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; + +final class RemoteClusterAwareClient extends AbstractClient { + + private final TransportService service; + private final String clusterAlias; + private final RemoteClusterService remoteClusterService; + + RemoteClusterAwareClient(Settings settings, ThreadPool threadPool, TransportService service, String clusterAlias) { + super(settings, threadPool); + this.service = service; + this.clusterAlias = clusterAlias; + this.remoteClusterService = service.getRemoteClusterService(); + } + + @Override + protected > void doExecute(Action action, Request request, ActionListener listener) { + remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(res -> { + Transport.Connection connection = remoteClusterService.getConnection(clusterAlias); + service.sendRequest(connection, action.name(), request, TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>(listener, action::newResponse)); + }, + listener::onFailure)); + } + + + @Override + public void close() { + // do nothing + } + + @Override + public Client getRemoteClusterClient(String clusterAlias) { + return remoteClusterService.getRemoteClusterClient(threadPool(), clusterAlias); + } +} diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index b253d9d23df4e..f84f4a6ba18e3 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.transport; +import org.elasticsearch.client.Client; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -36,6 +37,7 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.CountDown; +import org.elasticsearch.threadpool.ThreadPool; import java.io.Closeable; import java.io.IOException; @@ -398,4 +400,16 @@ public void onFailure(Exception e) { }); } } + + /** + * Returns a client to the remote cluster if the given cluster alias exists. + * @param threadPool the {@link ThreadPool} for the client + * @param clusterAlias the cluster alias the remote cluster is registred under + */ + public Client getRemoteClusterClient(ThreadPool threadPool, String clusterAlias) { + if (transportService.getRemoteClusterService().getRemoteClusterNames().contains(clusterAlias)) { + throw new IllegalArgumentException("unknown cluster alias [" + clusterAlias + "]"); + } + return new RemoteClusterAwareClient(settings, threadPool, transportService, clusterAlias); + } } diff --git a/server/src/test/java/org/elasticsearch/client/node/NodeClientHeadersTests.java b/server/src/test/java/org/elasticsearch/client/node/NodeClientHeadersTests.java index bca04738d8b89..5e739cc325040 100644 --- a/server/src/test/java/org/elasticsearch/client/node/NodeClientHeadersTests.java +++ b/server/src/test/java/org/elasticsearch/client/node/NodeClientHeadersTests.java @@ -43,7 +43,7 @@ protected Client buildClient(Settings headersSettings, GenericAction[] testedAct Settings settings = HEADER_SETTINGS; Actions actions = new Actions(settings, threadPool, testedActions); NodeClient client = new NodeClient(settings, threadPool); - client.initialize(actions, () -> "test"); + client.initialize(actions, () -> "test", null); return client; } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java new file mode 100644 index 0000000000000..33877436acec5 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java @@ -0,0 +1,90 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.transport; + +import org.elasticsearch.Version; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.transport.RemoteClusterConnectionTests.startTransport; + +public class RemoteClusterClientTests extends ESTestCase { + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + + @Override + public void tearDown() throws Exception { + super.tearDown(); + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + + public void testConnectAndExecuteRequest() throws Exception { + Settings remoteSettings = Settings.builder().put(ClusterName.CLUSTER_NAME_SETTING.getKey(), "foo_bar_cluster").build(); + try (MockTransportService remoteTransport = startTransport("remote_node", Collections.emptyList(), Version.CURRENT, threadPool, + remoteSettings)) { + DiscoveryNode remoteNode = remoteTransport.getLocalDiscoNode(); + + Settings localSettings = Settings.builder() + .put(RemoteClusterService.ENABLE_REMOTE_CLUSTERS.getKey(), true) + .put("search.remote.test.seeds", remoteNode.getAddress().getAddress() + ":" + remoteNode.getAddress().getPort()).build(); + try (MockTransportService service = MockTransportService.createNewService(localSettings, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + RemoteClusterService remoteClusterService = service.getRemoteClusterService(); + assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode)); + Client client = remoteClusterService.getRemoteClusterClient(threadPool, "test"); + ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().execute().get(); + assertNotNull(clusterStateResponse); + assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value()); + } + } + } + + public void testEnsureWeReconnect() throws Exception { + Settings remoteSettings = Settings.builder().put(ClusterName.CLUSTER_NAME_SETTING.getKey(), "foo_bar_cluster").build(); + try (MockTransportService remoteTransport = startTransport("remote_node", Collections.emptyList(), Version.CURRENT, threadPool, + remoteSettings)) { + DiscoveryNode remoteNode = remoteTransport.getLocalDiscoNode(); + Settings localSettings = Settings.builder() + .put(RemoteClusterService.ENABLE_REMOTE_CLUSTERS.getKey(), true) + .put("search.remote.test.seeds", remoteNode.getAddress().getAddress() + ":" + remoteNode.getAddress().getPort()).build(); + try (MockTransportService service = MockTransportService.createNewService(localSettings, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + service.disconnectFromNode(remoteNode); + RemoteClusterService remoteClusterService = service.getRemoteClusterService(); + assertBusy(() -> assertFalse(remoteClusterService.isRemoteNodeConnected("test", remoteNode))); + Client client = remoteClusterService.getRemoteClusterClient(threadPool, "test"); + ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().execute().get(); + assertNotNull(clusterStateResponse); + assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value()); + } + } + } + +} diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 0d8a469981966..c7aab8af69209 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -277,7 +277,7 @@ public void testFilterDiscoveredNodes() throws Exception { } } - private void updateSeedNodes(RemoteClusterConnection connection, List seedNodes) throws Exception { + static void updateSeedNodes(RemoteClusterConnection connection, List seedNodes) throws Exception { CountDownLatch latch = new CountDownLatch(1); AtomicReference exceptionAtomicReference = new AtomicReference<>(); ActionListener listener = ActionListener.wrap(x -> latch.countDown(), x -> { From 13fd567dc75bfbe92e1b31f654dcd6b12462ea48 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 12 Apr 2018 16:33:52 +0200 Subject: [PATCH 2/8] revert unrelated change --- .../elasticsearch/transport/RemoteClusterConnectionTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index c7aab8af69209..0d8a469981966 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -277,7 +277,7 @@ public void testFilterDiscoveredNodes() throws Exception { } } - static void updateSeedNodes(RemoteClusterConnection connection, List seedNodes) throws Exception { + private void updateSeedNodes(RemoteClusterConnection connection, List seedNodes) throws Exception { CountDownLatch latch = new CountDownLatch(1); AtomicReference exceptionAtomicReference = new AtomicReference<>(); ActionListener listener = ActionListener.wrap(x -> latch.countDown(), x -> { From 6fed374e7e43d019b97067deb0fd84e7d67964df Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 12 Apr 2018 16:38:18 +0200 Subject: [PATCH 3/8] remove stale imports --- .../src/main/java/org/elasticsearch/client/node/NodeClient.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/client/node/NodeClient.java b/server/src/main/java/org/elasticsearch/client/node/NodeClient.java index c25ba6df52c5f..69bf5d21f7a4a 100644 --- a/server/src/main/java/org/elasticsearch/client/node/NodeClient.java +++ b/server/src/main/java/org/elasticsearch/client/node/NodeClient.java @@ -33,9 +33,7 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskListener; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.RemoteClusterAwareClient; import org.elasticsearch.transport.RemoteClusterService; -import org.elasticsearch.transport.TransportService; import java.util.Map; import java.util.function.Supplier; From c2084c5543d087bfed3fc8bea6dd2508b36039cb Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 12 Apr 2018 16:49:44 +0200 Subject: [PATCH 4/8] fix line len --- .../org/elasticsearch/transport/RemoteClusterAwareClient.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java index f8f78288a8995..aa476bf4dd267 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java @@ -43,7 +43,9 @@ final class RemoteClusterAwareClient extends AbstractClient { } @Override - protected > void doExecute(Action action, Request request, ActionListener listener) { + protected > + void doExecute(Action action, Request request, ActionListener listener) { remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(res -> { Transport.Connection connection = remoteClusterService.getConnection(clusterAlias); service.sendRequest(connection, action.name(), request, TransportRequestOptions.EMPTY, From a46c4f57465dd47ee85ca22fabde0ba76e8ec6d4 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 12 Apr 2018 20:24:21 +0200 Subject: [PATCH 5/8] fix alias check --- .../java/org/elasticsearch/transport/RemoteClusterService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index f84f4a6ba18e3..5212cc3526150 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -407,7 +407,7 @@ public void onFailure(Exception e) { * @param clusterAlias the cluster alias the remote cluster is registred under */ public Client getRemoteClusterClient(ThreadPool threadPool, String clusterAlias) { - if (transportService.getRemoteClusterService().getRemoteClusterNames().contains(clusterAlias)) { + if (transportService.getRemoteClusterService().getRemoteClusterNames().contains(clusterAlias) == false) { throw new IllegalArgumentException("unknown cluster alias [" + clusterAlias + "]"); } return new RemoteClusterAwareClient(settings, threadPool, transportService, clusterAlias); From efd34c076be287b97f8372983398615b408d6e6e Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 12 Apr 2018 20:26:56 +0200 Subject: [PATCH 6/8] also test a failure --- .../org/elasticsearch/transport/RemoteClusterClientTests.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java index 33877436acec5..6008b7900a059 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java @@ -61,6 +61,10 @@ public void testConnectAndExecuteRequest() throws Exception { ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().execute().get(); assertNotNull(clusterStateResponse); assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value()); + // also test a failure, there is no handler for search registered + ActionNotFoundTransportException ex = expectThrows(ActionNotFoundTransportException.class, + () -> client.prepareSearch().get()); + assertEquals("No handler for action [indices:data/read/search]", ex.getMessage()); } } } From ac49d1b5c2435e2c8b19fa469b5517246e9fe50f Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 13 Apr 2018 15:10:53 +0200 Subject: [PATCH 7/8] fix javadocs --- server/src/main/java/org/elasticsearch/client/Client.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/client/Client.java b/server/src/main/java/org/elasticsearch/client/Client.java index aecbc4a3c2e78..dbf63c97df37e 100644 --- a/server/src/main/java/org/elasticsearch/client/Client.java +++ b/server/src/main/java/org/elasticsearch/client/Client.java @@ -480,7 +480,9 @@ public interface Client extends ElasticsearchClient, Releasable { /** * Returns a client to a remote cluster with the given cluster alias. - * This method is optinoal and might throw {@link UnsupportedOperationException} + * + * @throws IllegalArgumentException if the given clusterAlias doesn't exist + * @throws UnsupportedOperationException if this functionality is not available on this client. */ default Client getRemoteClusterClient(String clusterAlias) { throw new UnsupportedOperationException("this client doesn't support remote cluster connections"); From 47e148b00a0952e5b83616deeaeebef423f617fb Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 13 Apr 2018 15:12:59 +0200 Subject: [PATCH 8/8] fix more javadocs --- server/src/main/java/org/elasticsearch/client/Client.java | 2 +- .../org/elasticsearch/transport/RemoteClusterService.java | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/client/Client.java b/server/src/main/java/org/elasticsearch/client/Client.java index dbf63c97df37e..adb2f509b999e 100644 --- a/server/src/main/java/org/elasticsearch/client/Client.java +++ b/server/src/main/java/org/elasticsearch/client/Client.java @@ -480,7 +480,7 @@ public interface Client extends ElasticsearchClient, Releasable { /** * Returns a client to a remote cluster with the given cluster alias. - * + * * @throws IllegalArgumentException if the given clusterAlias doesn't exist * @throws UnsupportedOperationException if this functionality is not available on this client. */ diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 5212cc3526150..f454571301777 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -404,7 +404,9 @@ public void onFailure(Exception e) { /** * Returns a client to the remote cluster if the given cluster alias exists. * @param threadPool the {@link ThreadPool} for the client - * @param clusterAlias the cluster alias the remote cluster is registred under + * @param clusterAlias the cluster alias the remote cluster is registered under + * + * @throws IllegalArgumentException if the given clusterAlias doesn't exist */ public Client getRemoteClusterClient(ThreadPool threadPool, String clusterAlias) { if (transportService.getRemoteClusterService().getRemoteClusterNames().contains(clusterAlias) == false) {