diff --git a/docs/changelog/117297.yaml b/docs/changelog/117297.yaml new file mode 100644 index 0000000000000..4a0051bbae644 --- /dev/null +++ b/docs/changelog/117297.yaml @@ -0,0 +1,5 @@ +pr: 117297 +summary: Fix CCS exchange when multi cluster aliases point to same cluster +area: ES|QL +type: bug +issues: [] diff --git a/test/framework/src/main/java/org/elasticsearch/test/AbstractMultiClustersTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/AbstractMultiClustersTestCase.java index 7b18cf575f190..ea82c9d21ab89 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/AbstractMultiClustersTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/AbstractMultiClustersTestCase.java @@ -17,6 +17,7 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Strings; import org.elasticsearch.plugins.Plugin; @@ -44,6 +45,7 @@ import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_SEED_PROVIDERS_SETTING; import static org.elasticsearch.discovery.SettingsBasedSeedHostsProvider.DISCOVERY_SEED_HOSTS_SETTING; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.not; @@ -149,19 +151,23 @@ public static void stopClusters() throws IOException { } protected void disconnectFromRemoteClusters() throws Exception { - Settings.Builder settings = Settings.builder(); final Set clusterAliases = clusterGroup.clusterAliases(); for (String clusterAlias : clusterAliases) { if (clusterAlias.equals(LOCAL_CLUSTER) == false) { - settings.putNull("cluster.remote." + clusterAlias + ".seeds"); - settings.putNull("cluster.remote." + clusterAlias + ".mode"); - settings.putNull("cluster.remote." + clusterAlias + ".proxy_address"); + removeRemoteCluster(clusterAlias); } } + } + + protected void removeRemoteCluster(String clusterAlias) throws Exception { + Settings.Builder settings = Settings.builder(); + settings.putNull("cluster.remote." + clusterAlias + ".seeds"); + settings.putNull("cluster.remote." + clusterAlias + ".mode"); + settings.putNull("cluster.remote." + clusterAlias + ".proxy_address"); client().admin().cluster().prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT).setPersistentSettings(settings).get(); assertBusy(() -> { for (TransportService transportService : cluster(LOCAL_CLUSTER).getInstances(TransportService.class)) { - assertThat(transportService.getRemoteClusterService().getRegisteredRemoteClusterNames(), empty()); + assertThat(transportService.getRemoteClusterService().getRegisteredRemoteClusterNames(), not(contains(clusterAlias))); } }); } @@ -178,12 +184,17 @@ protected void configureAndConnectsToRemoteClusters() throws Exception { } protected void configureRemoteCluster(String clusterAlias, Collection seedNodes) throws Exception { - final String remoteClusterSettingPrefix = "cluster.remote." + clusterAlias + "."; - Settings.Builder settings = Settings.builder(); - final List seedAddresses = seedNodes.stream().map(node -> { + final var seedAddresses = seedNodes.stream().map(node -> { final TransportService transportService = cluster(clusterAlias).getInstance(TransportService.class, node); - return transportService.boundAddress().publishAddress().toString(); + return transportService.boundAddress().publishAddress(); }).toList(); + configureRemoteClusterWithSeedAddresses(clusterAlias, seedAddresses); + } + + protected void configureRemoteClusterWithSeedAddresses(String clusterAlias, Collection seedNodes) throws Exception { + final String remoteClusterSettingPrefix = "cluster.remote." + clusterAlias + "."; + Settings.Builder settings = Settings.builder(); + final List seedAddresses = seedNodes.stream().map(TransportAddress::toString).toList(); boolean skipUnavailable = skipUnavailableForRemoteClusters().containsKey(clusterAlias) ? skipUnavailableForRemoteClusters().get(clusterAlias) : DEFAULT_SKIP_UNAVAILABLE; diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java index 06059944f1310..e6bae7ba385e6 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java @@ -40,6 +40,7 @@ import java.io.IOException; import java.util.Map; +import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; @@ -339,6 +340,10 @@ public boolean isEmpty() { return sinks.isEmpty(); } + public Set sinkKeys() { + return sinks.keySet(); + } + @Override protected void doStart() { diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java index df6a1e00b0212..c426e0f528eab 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.action; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.TransportCancelTasksAction; import org.elasticsearch.action.bulk.BulkRequestBuilder; @@ -15,6 +16,7 @@ import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.compute.operator.DriverTaskRunner; import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.core.TimeValue; @@ -27,8 +29,10 @@ import org.elasticsearch.search.lookup.SearchLookup; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.test.AbstractMultiClustersTestCase; +import org.elasticsearch.transport.TransportService; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.esql.plugin.ComputeService; import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; import org.junit.Before; @@ -40,8 +44,10 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; import static org.elasticsearch.xpack.esql.action.AbstractEsqlIntegTestCase.randomPragmas; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; @@ -189,4 +195,44 @@ public void testCancel() throws Exception { Exception error = expectThrows(Exception.class, requestFuture::actionGet); assertThat(error.getMessage(), containsString("proxy timeout")); } + + public void testSameRemoteClusters() throws Exception { + TransportAddress address = cluster(REMOTE_CLUSTER).getInstance(TransportService.class).getLocalNode().getAddress(); + int moreClusters = between(1, 5); + for (int i = 0; i < moreClusters; i++) { + String clusterAlias = REMOTE_CLUSTER + "-" + i; + configureRemoteClusterWithSeedAddresses(clusterAlias, List.of(address)); + } + int numDocs = between(10, 100); + createRemoteIndex(numDocs); + EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); + request.query("FROM *:test | STATS total=sum(const) | LIMIT 1"); + request.pragmas(randomPragmas()); + ActionFuture future = client().execute(EsqlQueryAction.INSTANCE, request); + try { + try { + assertBusy(() -> { + List tasks = client(REMOTE_CLUSTER).admin() + .cluster() + .prepareListTasks() + .setActions(ComputeService.CLUSTER_ACTION_NAME) + .get() + .getTasks(); + assertThat(tasks, hasSize(moreClusters + 1)); + }); + } finally { + PauseFieldPlugin.allowEmitting.countDown(); + } + try (EsqlQueryResponse resp = future.actionGet(30, TimeUnit.SECONDS)) { + // TODO: This produces incorrect results because data on the remote cluster is processed multiple times. + long expectedCount = numDocs * (moreClusters + 1L); + assertThat(getValuesList(resp), equalTo(List.of(List.of(expectedCount)))); + } + } finally { + for (int i = 0; i < moreClusters; i++) { + String clusterAlias = REMOTE_CLUSTER + "-" + i; + removeRemoteCluster(clusterAlias); + } + } + } } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java index cde4f10ef556c..5f299fdca4d31 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java @@ -401,7 +401,8 @@ protected void doRun() throws Exception { }); sessionId = foundTasks.get(0).taskId().toString(); assertTrue(fetchingStarted.await(1, TimeUnit.MINUTES)); - ExchangeSinkHandler exchangeSink = exchangeService.getSinkHandler(sessionId); + String exchangeId = exchangeService.sinkKeys().stream().filter(s -> s.startsWith(sessionId)).findFirst().get(); + ExchangeSinkHandler exchangeSink = exchangeService.getSinkHandler(exchangeId); waitedForPages = randomBoolean(); if (waitedForPages) { // do not fail exchange requests until we have some pages diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index fc4c057e52ab6..eeed811674f60 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -82,6 +82,7 @@ import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME; @@ -101,6 +102,7 @@ public class ComputeService { private final EnrichLookupService enrichLookupService; private final LookupFromIndexService lookupFromIndexService; private final ClusterService clusterService; + private final AtomicLong childSessionIdGenerator = new AtomicLong(); public ComputeService( SearchService searchService, @@ -167,7 +169,7 @@ public void execute( return; } var computeContext = new ComputeContext( - sessionId, + newChildSession(sessionId), RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, List.of(), configuration, @@ -330,14 +332,15 @@ private void startComputeOnDataNodes( // the new remote exchange sink, and initialize the computation on the target node via data-node-request. for (DataNode node : dataNodeResult.dataNodes()) { var queryPragmas = configuration.pragmas(); + var childSessionId = newChildSession(sessionId); ExchangeService.openExchange( transportService, node.connection, - sessionId, + childSessionId, queryPragmas.exchangeBufferSize(), esqlExecutor, refs.acquire().delegateFailureAndWrap((l, unused) -> { - var remoteSink = exchangeService.newRemoteSink(parentTask, sessionId, transportService, node.connection); + var remoteSink = exchangeService.newRemoteSink(parentTask, childSessionId, transportService, node.connection); exchangeSource.addRemoteSink(remoteSink, queryPragmas.concurrentExchangeClients()); ActionListener computeResponseListener = computeListener.acquireCompute(clusterAlias); var dataNodeListener = ActionListener.runBefore(computeResponseListener, () -> l.onResponse(null)); @@ -345,7 +348,7 @@ private void startComputeOnDataNodes( node.connection, DATA_ACTION_NAME, new DataNodeRequest( - sessionId, + childSessionId, configuration, clusterAlias, node.shardIds, @@ -378,17 +381,18 @@ private void startComputeOnRemoteClusters( var linkExchangeListeners = ActionListener.releaseAfter(computeListener.acquireAvoid(), exchangeSource.addEmptySink()); try (RefCountingListener refs = new RefCountingListener(linkExchangeListeners)) { for (RemoteCluster cluster : clusters) { + final var childSessionId = newChildSession(sessionId); ExchangeService.openExchange( transportService, cluster.connection, - sessionId, + childSessionId, queryPragmas.exchangeBufferSize(), esqlExecutor, refs.acquire().delegateFailureAndWrap((l, unused) -> { - var remoteSink = exchangeService.newRemoteSink(rootTask, sessionId, transportService, cluster.connection); + var remoteSink = exchangeService.newRemoteSink(rootTask, childSessionId, transportService, cluster.connection); exchangeSource.addRemoteSink(remoteSink, queryPragmas.concurrentExchangeClients()); var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices); - var clusterRequest = new ClusterComputeRequest(cluster.clusterAlias, sessionId, configuration, remotePlan); + var clusterRequest = new ClusterComputeRequest(cluster.clusterAlias, childSessionId, configuration, remotePlan); var clusterListener = ActionListener.runBefore( computeListener.acquireCompute(cluster.clusterAlias()), () -> l.onResponse(null) @@ -912,4 +916,8 @@ public List searchExecutionContexts() { return searchContexts.stream().map(ctx -> ctx.getSearchExecutionContext()).toList(); } } + + private String newChildSession(String session) { + return session + "/" + childSessionIdGenerator.incrementAndGet(); + } }