From f4c53b8509e8d444c5f750f84d44a03e5f7722a3 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 22 Nov 2024 13:23:56 -0800 Subject: [PATCH] Fix CCS exchange when multi cluster aliases point to same cluster (#117297) (#117388) [esql] > Unexpected error from Elasticsearch: illegal_state_exception - sink exchanger for id [ruxoDDxXTGW55oIPHoCT-g:964613010] already exists. This issue occurs when two or more clusterAliases point to the same physical remote cluster. The exchange service assumes the destination is unique, which is not true in this topology. This PR addresses the problem by appending a suffix using a monotonic increasing number, ensuring that different exchanges are created in such cases. Another issue arising from this behavior is that data on a remote cluster is processed multiple times, leading to incorrect results. I can work on the fix for this once we agree that this is an issue. --- docs/changelog/117297.yaml | 5 ++ .../test/AbstractMultiClustersTestCase.java | 29 ++++++++---- .../operator/exchange/ExchangeService.java | 5 ++ .../action/CrossClustersCancellationIT.java | 46 +++++++++++++++++++ .../xpack/esql/action/EsqlActionTaskIT.java | 3 +- .../xpack/esql/plugin/ComputeService.java | 22 ++++++--- 6 files changed, 93 insertions(+), 17 deletions(-) create mode 100644 docs/changelog/117297.yaml diff --git a/docs/changelog/117297.yaml b/docs/changelog/117297.yaml new file mode 100644 index 0000000000000..4a0051bbae644 --- /dev/null +++ b/docs/changelog/117297.yaml @@ -0,0 +1,5 @@ +pr: 117297 +summary: Fix CCS exchange when multi cluster aliases point to same cluster +area: ES|QL +type: bug +issues: [] diff --git a/test/framework/src/main/java/org/elasticsearch/test/AbstractMultiClustersTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/AbstractMultiClustersTestCase.java index 7b18cf575f190..ea82c9d21ab89 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/AbstractMultiClustersTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/AbstractMultiClustersTestCase.java @@ -17,6 +17,7 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Strings; import org.elasticsearch.plugins.Plugin; @@ -44,6 +45,7 @@ import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_SEED_PROVIDERS_SETTING; import static org.elasticsearch.discovery.SettingsBasedSeedHostsProvider.DISCOVERY_SEED_HOSTS_SETTING; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.not; @@ -149,19 +151,23 @@ public static void stopClusters() throws IOException { } protected void disconnectFromRemoteClusters() throws Exception { - Settings.Builder settings = Settings.builder(); final Set clusterAliases = clusterGroup.clusterAliases(); for (String clusterAlias : clusterAliases) { if (clusterAlias.equals(LOCAL_CLUSTER) == false) { - settings.putNull("cluster.remote." + clusterAlias + ".seeds"); - settings.putNull("cluster.remote." + clusterAlias + ".mode"); - settings.putNull("cluster.remote." + clusterAlias + ".proxy_address"); + removeRemoteCluster(clusterAlias); } } + } + + protected void removeRemoteCluster(String clusterAlias) throws Exception { + Settings.Builder settings = Settings.builder(); + settings.putNull("cluster.remote." + clusterAlias + ".seeds"); + settings.putNull("cluster.remote." + clusterAlias + ".mode"); + settings.putNull("cluster.remote." + clusterAlias + ".proxy_address"); client().admin().cluster().prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT).setPersistentSettings(settings).get(); assertBusy(() -> { for (TransportService transportService : cluster(LOCAL_CLUSTER).getInstances(TransportService.class)) { - assertThat(transportService.getRemoteClusterService().getRegisteredRemoteClusterNames(), empty()); + assertThat(transportService.getRemoteClusterService().getRegisteredRemoteClusterNames(), not(contains(clusterAlias))); } }); } @@ -178,12 +184,17 @@ protected void configureAndConnectsToRemoteClusters() throws Exception { } protected void configureRemoteCluster(String clusterAlias, Collection seedNodes) throws Exception { - final String remoteClusterSettingPrefix = "cluster.remote." + clusterAlias + "."; - Settings.Builder settings = Settings.builder(); - final List seedAddresses = seedNodes.stream().map(node -> { + final var seedAddresses = seedNodes.stream().map(node -> { final TransportService transportService = cluster(clusterAlias).getInstance(TransportService.class, node); - return transportService.boundAddress().publishAddress().toString(); + return transportService.boundAddress().publishAddress(); }).toList(); + configureRemoteClusterWithSeedAddresses(clusterAlias, seedAddresses); + } + + protected void configureRemoteClusterWithSeedAddresses(String clusterAlias, Collection seedNodes) throws Exception { + final String remoteClusterSettingPrefix = "cluster.remote." + clusterAlias + "."; + Settings.Builder settings = Settings.builder(); + final List seedAddresses = seedNodes.stream().map(TransportAddress::toString).toList(); boolean skipUnavailable = skipUnavailableForRemoteClusters().containsKey(clusterAlias) ? skipUnavailableForRemoteClusters().get(clusterAlias) : DEFAULT_SKIP_UNAVAILABLE; diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java index 06059944f1310..e6bae7ba385e6 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java @@ -40,6 +40,7 @@ import java.io.IOException; import java.util.Map; +import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; @@ -339,6 +340,10 @@ public boolean isEmpty() { return sinks.isEmpty(); } + public Set sinkKeys() { + return sinks.keySet(); + } + @Override protected void doStart() { diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java index df6a1e00b0212..c426e0f528eab 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.action; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.TransportCancelTasksAction; import org.elasticsearch.action.bulk.BulkRequestBuilder; @@ -15,6 +16,7 @@ import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.compute.operator.DriverTaskRunner; import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.core.TimeValue; @@ -27,8 +29,10 @@ import org.elasticsearch.search.lookup.SearchLookup; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.test.AbstractMultiClustersTestCase; +import org.elasticsearch.transport.TransportService; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.esql.plugin.ComputeService; import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; import org.junit.Before; @@ -40,8 +44,10 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; import static org.elasticsearch.xpack.esql.action.AbstractEsqlIntegTestCase.randomPragmas; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; @@ -189,4 +195,44 @@ public void testCancel() throws Exception { Exception error = expectThrows(Exception.class, requestFuture::actionGet); assertThat(error.getMessage(), containsString("proxy timeout")); } + + public void testSameRemoteClusters() throws Exception { + TransportAddress address = cluster(REMOTE_CLUSTER).getInstance(TransportService.class).getLocalNode().getAddress(); + int moreClusters = between(1, 5); + for (int i = 0; i < moreClusters; i++) { + String clusterAlias = REMOTE_CLUSTER + "-" + i; + configureRemoteClusterWithSeedAddresses(clusterAlias, List.of(address)); + } + int numDocs = between(10, 100); + createRemoteIndex(numDocs); + EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); + request.query("FROM *:test | STATS total=sum(const) | LIMIT 1"); + request.pragmas(randomPragmas()); + ActionFuture future = client().execute(EsqlQueryAction.INSTANCE, request); + try { + try { + assertBusy(() -> { + List tasks = client(REMOTE_CLUSTER).admin() + .cluster() + .prepareListTasks() + .setActions(ComputeService.CLUSTER_ACTION_NAME) + .get() + .getTasks(); + assertThat(tasks, hasSize(moreClusters + 1)); + }); + } finally { + PauseFieldPlugin.allowEmitting.countDown(); + } + try (EsqlQueryResponse resp = future.actionGet(30, TimeUnit.SECONDS)) { + // TODO: This produces incorrect results because data on the remote cluster is processed multiple times. + long expectedCount = numDocs * (moreClusters + 1L); + assertThat(getValuesList(resp), equalTo(List.of(List.of(expectedCount)))); + } + } finally { + for (int i = 0; i < moreClusters; i++) { + String clusterAlias = REMOTE_CLUSTER + "-" + i; + removeRemoteCluster(clusterAlias); + } + } + } } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java index cde4f10ef556c..5f299fdca4d31 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java @@ -401,7 +401,8 @@ protected void doRun() throws Exception { }); sessionId = foundTasks.get(0).taskId().toString(); assertTrue(fetchingStarted.await(1, TimeUnit.MINUTES)); - ExchangeSinkHandler exchangeSink = exchangeService.getSinkHandler(sessionId); + String exchangeId = exchangeService.sinkKeys().stream().filter(s -> s.startsWith(sessionId)).findFirst().get(); + ExchangeSinkHandler exchangeSink = exchangeService.getSinkHandler(exchangeId); waitedForPages = randomBoolean(); if (waitedForPages) { // do not fail exchange requests until we have some pages diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index fc4c057e52ab6..eeed811674f60 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -82,6 +82,7 @@ import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME; @@ -101,6 +102,7 @@ public class ComputeService { private final EnrichLookupService enrichLookupService; private final LookupFromIndexService lookupFromIndexService; private final ClusterService clusterService; + private final AtomicLong childSessionIdGenerator = new AtomicLong(); public ComputeService( SearchService searchService, @@ -167,7 +169,7 @@ public void execute( return; } var computeContext = new ComputeContext( - sessionId, + newChildSession(sessionId), RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, List.of(), configuration, @@ -330,14 +332,15 @@ private void startComputeOnDataNodes( // the new remote exchange sink, and initialize the computation on the target node via data-node-request. for (DataNode node : dataNodeResult.dataNodes()) { var queryPragmas = configuration.pragmas(); + var childSessionId = newChildSession(sessionId); ExchangeService.openExchange( transportService, node.connection, - sessionId, + childSessionId, queryPragmas.exchangeBufferSize(), esqlExecutor, refs.acquire().delegateFailureAndWrap((l, unused) -> { - var remoteSink = exchangeService.newRemoteSink(parentTask, sessionId, transportService, node.connection); + var remoteSink = exchangeService.newRemoteSink(parentTask, childSessionId, transportService, node.connection); exchangeSource.addRemoteSink(remoteSink, queryPragmas.concurrentExchangeClients()); ActionListener computeResponseListener = computeListener.acquireCompute(clusterAlias); var dataNodeListener = ActionListener.runBefore(computeResponseListener, () -> l.onResponse(null)); @@ -345,7 +348,7 @@ private void startComputeOnDataNodes( node.connection, DATA_ACTION_NAME, new DataNodeRequest( - sessionId, + childSessionId, configuration, clusterAlias, node.shardIds, @@ -378,17 +381,18 @@ private void startComputeOnRemoteClusters( var linkExchangeListeners = ActionListener.releaseAfter(computeListener.acquireAvoid(), exchangeSource.addEmptySink()); try (RefCountingListener refs = new RefCountingListener(linkExchangeListeners)) { for (RemoteCluster cluster : clusters) { + final var childSessionId = newChildSession(sessionId); ExchangeService.openExchange( transportService, cluster.connection, - sessionId, + childSessionId, queryPragmas.exchangeBufferSize(), esqlExecutor, refs.acquire().delegateFailureAndWrap((l, unused) -> { - var remoteSink = exchangeService.newRemoteSink(rootTask, sessionId, transportService, cluster.connection); + var remoteSink = exchangeService.newRemoteSink(rootTask, childSessionId, transportService, cluster.connection); exchangeSource.addRemoteSink(remoteSink, queryPragmas.concurrentExchangeClients()); var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices); - var clusterRequest = new ClusterComputeRequest(cluster.clusterAlias, sessionId, configuration, remotePlan); + var clusterRequest = new ClusterComputeRequest(cluster.clusterAlias, childSessionId, configuration, remotePlan); var clusterListener = ActionListener.runBefore( computeListener.acquireCompute(cluster.clusterAlias()), () -> l.onResponse(null) @@ -912,4 +916,8 @@ public List searchExecutionContexts() { return searchContexts.stream().map(ctx -> ctx.getSearchExecutionContext()).toList(); } } + + private String newChildSession(String session) { + return session + "/" + childSessionIdGenerator.incrementAndGet(); + } }