diff --git a/x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/java/org/elasticsearch/xpack/entsearch/EnterpriseSearchRestIT.java b/x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/java/org/elasticsearch/xpack/entsearch/EnterpriseSearchRestIT.java index 7d6111d44880f..59c8a677fbc13 100644 --- a/x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/java/org/elasticsearch/xpack/entsearch/EnterpriseSearchRestIT.java +++ b/x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/java/org/elasticsearch/xpack/entsearch/EnterpriseSearchRestIT.java @@ -9,14 +9,12 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; -import org.apache.lucene.tests.util.LuceneTestCase; import org.elasticsearch.common.settings.SecureString; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate; import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase; -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/95917") public class EnterpriseSearchRestIT extends ESClientYamlSuiteTestCase { public EnterpriseSearchRestIT(final ClientYamlTestCandidate testCandidate) { diff --git a/x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/entsearch/55_search_application_search.yml b/x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/entsearch/55_search_application_search.yml index 95b7f57e1fb4c..a87f6dca83e14 100644 --- a/x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/entsearch/55_search_application_search.yml +++ b/x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/entsearch/55_search_application_search.yml @@ -92,6 +92,9 @@ teardown: --- "Query Search Application with default parameters": + - skip: + version: all + reason: "AwaitsFix https://github.com/elastic/enterprise-search-team/issues/4540" - do: search_application.search: @@ -103,6 +106,9 @@ teardown: --- "Query Search Application overriding part of the parameters": + - skip: + version: all + reason: "AwaitsFix https://github.com/elastic/enterprise-search-team/issues/4540" - do: search_application.search: @@ -117,6 +123,9 @@ teardown: --- "Query Search Application overriding all parameters": + - skip: + version: all + reason: "AwaitsFix https://github.com/elastic/enterprise-search-team/issues/4540" - do: search_application.search: @@ -132,6 +141,9 @@ teardown: --- "Query Search Application with invalid parameter validation": + - skip: + version: all + reason: "AwaitsFix https://github.com/elastic/enterprise-search-team/issues/4540" - do: catch: "bad_request" @@ -144,6 +156,9 @@ teardown: --- "Query Search Application without required parameter": + - skip: + version: all + reason: "AwaitsFix https://github.com/elastic/enterprise-search-team/issues/4540" - do: catch: "bad_request" diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/EnterpriseSearch.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/EnterpriseSearch.java index f5a8a2fbed2e9..98a44f82c1305 100644 --- a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/EnterpriseSearch.java +++ b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/EnterpriseSearch.java @@ -37,7 +37,6 @@ import org.elasticsearch.tracing.Tracer; import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xcontent.NamedXContentRegistry; -import org.elasticsearch.xpack.application.analytics.AnalyticsIngestPipelineRegistry; import org.elasticsearch.xpack.application.analytics.AnalyticsTemplateRegistry; import org.elasticsearch.xpack.application.analytics.action.DeleteAnalyticsCollectionAction; import org.elasticsearch.xpack.application.analytics.action.GetAnalyticsCollectionAction; @@ -183,13 +182,7 @@ public Collection createComponents( ); analyticsTemplateRegistry.initialize(); - final AnalyticsIngestPipelineRegistry analyticsPipelineRegistry = new AnalyticsIngestPipelineRegistry( - clusterService, - threadPool, - client - ); - - return Arrays.asList(analyticsTemplateRegistry, analyticsPipelineRegistry); + return Arrays.asList(analyticsTemplateRegistry); } @Override diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/analytics/AnalyticsIngestPipelineRegistry.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/analytics/AnalyticsIngestPipelineRegistry.java deleted file mode 100644 index 8cc23246d80c5..0000000000000 --- a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/analytics/AnalyticsIngestPipelineRegistry.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ -package org.elasticsearch.xpack.application.analytics; - -import org.elasticsearch.Version; -import org.elasticsearch.client.internal.Client; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.application.utils.ingest.PipelineRegistry; -import org.elasticsearch.xpack.application.utils.ingest.PipelineTemplateConfiguration; - -import java.util.Collections; -import java.util.List; -import java.util.Set; - -import static org.elasticsearch.xpack.application.analytics.AnalyticsConstants.EVENT_DATA_STREAM_INDEX_PREFIX; -import static org.elasticsearch.xpack.application.analytics.AnalyticsConstants.ROOT_RESOURCE_PATH; -import static org.elasticsearch.xpack.application.analytics.AnalyticsConstants.TEMPLATE_VERSION_VARIABLE; -import static org.elasticsearch.xpack.application.analytics.AnalyticsTemplateRegistry.REGISTRY_VERSION; -import static org.elasticsearch.xpack.core.ClientHelper.ENT_SEARCH_ORIGIN; - -/** - * Set up the behavioral analytics ingest pipelines. - */ -public class AnalyticsIngestPipelineRegistry extends PipelineRegistry { - - // Ingest pipelines configuration. - static final String EVENT_DATA_STREAM_INGEST_PIPELINE_NAME = EVENT_DATA_STREAM_INDEX_PREFIX + "final_pipeline"; - static final List INGEST_PIPELINES = Collections.singletonList( - new PipelineTemplateConfiguration( - EVENT_DATA_STREAM_INGEST_PIPELINE_NAME, - ROOT_RESOURCE_PATH + EVENT_DATA_STREAM_INGEST_PIPELINE_NAME + ".json", - REGISTRY_VERSION, - TEMPLATE_VERSION_VARIABLE - ) - ); - - public AnalyticsIngestPipelineRegistry(ClusterService clusterService, ThreadPool threadPool, Client client) { - super(clusterService, threadPool, client); - } - - @Override - protected String getOrigin() { - return ENT_SEARCH_ORIGIN; - } - - @Override - protected List getIngestPipelineConfigs() { - return INGEST_PIPELINES; - } - - @Override - protected boolean isClusterReady(ClusterChangedEvent event) { - return super.isClusterReady(event) && (isIngestPipelineInstalled(event.state()) || hasAnalyticsEventDataStream(event.state())); - } - - @Override - protected Version getMinSupportedNodeVersion() { - return Version.V_8_8_0; - } - - private boolean hasAnalyticsEventDataStream(ClusterState state) { - Set dataStreamNames = state.metadata().dataStreams().keySet(); - - return dataStreamNames.stream().anyMatch(dataStreamName -> dataStreamName.startsWith(EVENT_DATA_STREAM_INDEX_PREFIX)); - } - - private boolean isIngestPipelineInstalled(ClusterState state) { - return ingestPipelineExists(state, EVENT_DATA_STREAM_INGEST_PIPELINE_NAME); - } -} diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/analytics/AnalyticsTemplateRegistry.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/analytics/AnalyticsTemplateRegistry.java index c2190c5837528..41b88d625c0d7 100644 --- a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/analytics/AnalyticsTemplateRegistry.java +++ b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/analytics/AnalyticsTemplateRegistry.java @@ -18,6 +18,7 @@ import org.elasticsearch.xpack.core.ilm.LifecyclePolicy; import org.elasticsearch.xpack.core.template.IndexTemplateConfig; import org.elasticsearch.xpack.core.template.IndexTemplateRegistry; +import org.elasticsearch.xpack.core.template.IngestPipelineConfig; import org.elasticsearch.xpack.core.template.LifecyclePolicyConfig; import java.io.IOException; @@ -48,6 +49,8 @@ public class AnalyticsTemplateRegistry extends IndexTemplateRegistry { static final String EVENT_DATA_STREAM_SETTINGS_COMPONENT_NAME = EVENT_DATA_STREAM_INDEX_PREFIX + "settings"; static final String EVENT_DATA_STREAM_MAPPINGS_COMPONENT_NAME = EVENT_DATA_STREAM_INDEX_PREFIX + "mappings"; + static final String EVENT_DATA_STREAM_INGEST_PIPELINE_NAME = EVENT_DATA_STREAM_INDEX_PREFIX + "final_pipeline"; + static final Map COMPONENT_TEMPLATES; static { @@ -78,6 +81,18 @@ public class AnalyticsTemplateRegistry extends IndexTemplateRegistry { COMPONENT_TEMPLATES = Map.copyOf(componentTemplates); } + @Override + protected List getIngestPipelines() { + return List.of( + new IngestPipelineConfig( + EVENT_DATA_STREAM_INGEST_PIPELINE_NAME, + ROOT_RESOURCE_PATH + EVENT_DATA_STREAM_INGEST_PIPELINE_NAME + ".json", + REGISTRY_VERSION, + TEMPLATE_VERSION_VARIABLE + ) + ); + } + // Composable index templates configuration. static final String EVENT_DATA_STREAM_TEMPLATE_NAME = EVENT_DATA_STREAM_INDEX_PREFIX + "default"; static final String EVENT_DATA_STREAM_TEMPLATE_FILENAME = EVENT_DATA_STREAM_INDEX_PREFIX + "template"; diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/search/action/SearchApplicationSearchRequest.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/search/action/SearchApplicationSearchRequest.java index fab11c118a0ad..f8c94936645f1 100644 --- a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/search/action/SearchApplicationSearchRequest.java +++ b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/search/action/SearchApplicationSearchRequest.java @@ -13,6 +13,9 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.core.Nullable; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xcontent.ConstructingObjectParser; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.XContentParser; @@ -83,6 +86,11 @@ public ActionRequestValidationException validate() { return validationException; } + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, getDescription(), parentTaskId, headers); + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/utils/ingest/PipelineRegistry.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/utils/ingest/PipelineRegistry.java deleted file mode 100644 index 01cd11df19016..0000000000000 --- a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/utils/ingest/PipelineRegistry.java +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.application.utils.ingest; - -import org.elasticsearch.Version; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ingest.PutPipelineAction; -import org.elasticsearch.action.ingest.PutPipelineRequest; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.client.internal.Client; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateListener; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.core.Strings; -import org.elasticsearch.core.TimeValue; -import org.elasticsearch.gateway.GatewayService; -import org.elasticsearch.ingest.IngestMetadata; -import org.elasticsearch.ingest.PipelineConfiguration; -import org.elasticsearch.logging.LogManager; -import org.elasticsearch.logging.Logger; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xcontent.XContentBuilder; -import org.elasticsearch.xcontent.json.JsonXContent; - -import java.io.IOException; -import java.util.List; -import java.util.Objects; -import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.elasticsearch.core.Strings.format; -import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; - -/** - * Allow to create a registry that will manage ingest pipelines setup while the service is started. - * Also, pipeline are versioned, allowing to keep them up to date. - * The work is heavily inspired by {@link org.elasticsearch.xpack.core.template.IndexTemplateRegistry}. - */ -public abstract class PipelineRegistry implements ClusterStateListener { - - private static final Logger logger = LogManager.getLogger(PipelineRegistry.class); - private final ThreadPool threadPool; - private final Client client; - - private final ConcurrentMap pipelineCreationsInProgress = new ConcurrentHashMap<>(); - - public PipelineRegistry(ClusterService clusterService, ThreadPool threadPool, Client client) { - this.threadPool = threadPool; - this.client = client; - clusterService.addListener(this); - } - - @Override - public void clusterChanged(ClusterChangedEvent event) { - - if (isClusterReady(event)) { - addIngestPipelinesIfMissing(event.state()); - } - } - - protected abstract String getOrigin(); - - protected abstract List getIngestPipelineConfigs(); - - protected boolean isClusterReady(ClusterChangedEvent event) { - ClusterState state = event.state(); - if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { - // wait until recovered from disk, so the cluster state view is consistent - return false; - } - - DiscoveryNode masterNode = event.state().getNodes().getMasterNode(); - if (masterNode == null || state.nodes().isLocalNodeElectedMaster() == false) { - // no master node elected or current node is not master - return false; - } - - Version minNodeVersion = event.state().nodes().getMinNodeVersion(); - if (getMinSupportedNodeVersion().after(minNodeVersion)) { - return false; - } - - return true; - } - - /** - * Pipelines will not be installed until all nodes in the cluster are updated to at least the version returned by the method. - * - * @return {@link Version} minimum required version. - */ - protected abstract Version getMinSupportedNodeVersion(); - - private void addIngestPipelinesIfMissing(ClusterState state) { - for (PipelineTemplateConfiguration pipelineTemplateConfig : getIngestPipelineConfigs()) { - PipelineConfiguration newPipeline = pipelineTemplateConfig.load(); - final AtomicBoolean creationCheck = pipelineCreationsInProgress.computeIfAbsent( - newPipeline.getId(), - key -> new AtomicBoolean(false) - ); - - if (creationCheck.compareAndSet(false, true)) { - if (ingestPipelineExists(state, newPipeline.getId())) { - IngestMetadata ingestMetadata = state.metadata().custom(IngestMetadata.TYPE); - PipelineConfiguration existingPipeline = ingestMetadata.getPipelines().get(newPipeline.getId()); - boolean newPipelineHasVersion = Objects.nonNull(newPipeline.getVersion()); - boolean oldPipelineHasVersion = Objects.nonNull(existingPipeline.getVersion()); - if (newPipelineHasVersion - && (oldPipelineHasVersion == false || existingPipeline.getVersion() < newPipeline.getVersion())) { - logger.info( - "upgrading ingest pipeline [{}] for [{}] from version [{}] to version [{}]", - newPipeline.getId(), - getOrigin(), - existingPipeline.getVersion(), - newPipeline.getVersion() - ); - putIngestPipeline(newPipeline, creationCheck); - } else { - logger.debug( - "not adding ingest pipeline [{}] for [{}], because it already exists", - newPipeline.getId(), - getOrigin() - ); - creationCheck.set(false); - } - } else { - logger.debug("adding ingest pipeline [{}] for [{}], because it doesn't exist", newPipeline.getId(), getOrigin()); - putIngestPipeline(newPipeline, creationCheck); - } - } - } - } - - protected boolean ingestPipelineExists(ClusterState state, String pipelineId) { - Optional maybeMeta = Optional.ofNullable(state.metadata().custom(IngestMetadata.TYPE)); - return maybeMeta.isPresent() && maybeMeta.get().getPipelines().containsKey(pipelineId); - } - - private void putIngestPipeline(final PipelineConfiguration pipelineConfig, final AtomicBoolean creationCheck) { - final Executor executor = threadPool.generic(); - executor.execute(() -> { - try { - executeAsyncWithOrigin( - client.threadPool().getThreadContext(), - getOrigin(), - createPutPipelineRequest(pipelineConfig), - new ActionListener() { - @Override - public void onResponse(AcknowledgedResponse response) { - creationCheck.set(false); - if (response.isAcknowledged() == false) { - logger.error( - "error adding ingest pipeline [{}] for [{}], request was not acknowledged", - pipelineConfig.getId(), - getOrigin() - ); - } else { - logger.info("adding ingest pipeline {}", pipelineConfig.getId()); - } - } - - @Override - public void onFailure(Exception e) { - creationCheck.set(false); - onPutPipelineFailure(pipelineConfig.getId(), e); - } - }, - (req, listener) -> client.execute(PutPipelineAction.INSTANCE, req, listener) - ); - - } catch (IOException e) { - creationCheck.set(false); - logger.error( - Strings.format( - "not adding ingest pipeline [{}] for [{}], because of an error when reading the config", - pipelineConfig.getId(), - getOrigin() - ), - e - ); - } - }); - } - - private PutPipelineRequest createPutPipelineRequest(PipelineConfiguration pipelineConfiguration) throws IOException { - try (XContentBuilder builder = JsonXContent.contentBuilder()) { - PutPipelineRequest request = new PutPipelineRequest( - pipelineConfiguration.getId(), - BytesReference.bytes(builder.map(pipelineConfiguration.getConfigAsMap())), - builder.contentType() - ); - - request.masterNodeTimeout(TimeValue.timeValueMinutes(1)); - return request; - } - } - - /** - * Called when creation of an ingest pipeline fails. - * - * @param pipelineId the pipeline that failed to be created. - * @param e The exception that caused the failure. - */ - protected void onPutPipelineFailure(String pipelineId, Exception e) { - logger.error(() -> format("error adding ingest pipeline template [%s] for [%s]", pipelineId, getOrigin()), e); - } -} diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/utils/ingest/PipelineTemplateConfiguration.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/utils/ingest/PipelineTemplateConfiguration.java deleted file mode 100644 index 6566dd83fa1f6..0000000000000 --- a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/utils/ingest/PipelineTemplateConfiguration.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.application.utils.ingest; - -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.ingest.PipelineConfiguration; -import org.elasticsearch.xcontent.XContentType; -import org.elasticsearch.xpack.core.template.TemplateUtils; - -import java.util.Objects; - -public class PipelineTemplateConfiguration { - - private final String id; - private final String resource; - private final int version; - private final String versionProperty; - - public PipelineTemplateConfiguration(String id, String resource, int version, String versionProperty) { - this.id = Objects.requireNonNull(id); - this.resource = Objects.requireNonNull(resource); - this.version = version; - this.versionProperty = Objects.requireNonNull(versionProperty); - } - - public String getId() { - return id; - } - - public int getVersion() { - return version; - } - - public PipelineConfiguration load() { - String config = TemplateUtils.loadTemplate(resource, String.valueOf(version), versionProperty); - - return new PipelineConfiguration(id, new BytesArray(config), XContentType.JSON); - } -} diff --git a/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/analytics/AnalyticsIngestPipelineRegistryTests.java b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/analytics/AnalyticsIngestPipelineRegistryTests.java deleted file mode 100644 index f2cae8146890e..0000000000000 --- a/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/analytics/AnalyticsIngestPipelineRegistryTests.java +++ /dev/null @@ -1,323 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.application.analytics; - -import org.elasticsearch.Version; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.action.ActionType; -import org.elasticsearch.action.ingest.PutPipelineAction; -import org.elasticsearch.action.ingest.PutPipelineRequest; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlocks; -import org.elasticsearch.cluster.metadata.DataStream; -import org.elasticsearch.cluster.metadata.Metadata; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.node.TestDiscoveryNode; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.TriFunction; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.collect.MapBuilder; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.index.Index; -import org.elasticsearch.index.IndexMode; -import org.elasticsearch.ingest.IngestMetadata; -import org.elasticsearch.ingest.PipelineConfiguration; -import org.elasticsearch.test.ClusterServiceUtils; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.client.NoOpClient; -import org.elasticsearch.threadpool.TestThreadPool; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xcontent.XContentBuilder; -import org.elasticsearch.xcontent.json.JsonXContent; -import org.elasticsearch.xpack.application.utils.ingest.PipelineTemplateConfiguration; -import org.junit.After; -import org.junit.Before; - -import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; - -import static org.elasticsearch.xpack.application.analytics.AnalyticsIngestPipelineRegistry.EVENT_DATA_STREAM_INGEST_PIPELINE_NAME; -import static org.elasticsearch.xpack.application.analytics.AnalyticsTemplateRegistry.REGISTRY_VERSION; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.Matchers.instanceOf; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; - -public class AnalyticsIngestPipelineRegistryTests extends ESTestCase { - private AnalyticsIngestPipelineRegistry registry; - private ThreadPool threadPool; - private VerifyingClient client; - - @Before - public void createRegistryAndClient() { - threadPool = new TestThreadPool(this.getClass().getName()); - client = new VerifyingClient(threadPool); - ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); - registry = new AnalyticsIngestPipelineRegistry(clusterService, threadPool, client); - } - - @After - @Override - public void tearDown() throws Exception { - super.tearDown(); - threadPool.shutdownNow(); - } - - public void testThatNonExistingPipelinesAreAddedImmediately() throws Exception { - DiscoveryNode node = TestDiscoveryNode.create("node"); - DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build(); - - ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyMap(), nodes); - - AtomicInteger calledTimes = new AtomicInteger(0); - client.setVerifier((action, request, listener) -> verifyIngestPipelinesInstalled(calledTimes, action, request, listener)); - registry.clusterChanged(event); - assertBusy(() -> assertThat(calledTimes.get(), equalTo(registry.getIngestPipelineConfigs().size()))); - } - - public void testIngestPipelinesAlreadyExists() { - DiscoveryNode node = TestDiscoveryNode.create("node"); - DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build(); - - client.setVerifier((action, request, listener) -> { - if (action instanceof PutPipelineAction) { - fail("if the pipeline already exists it should not be re-put"); - } else { - fail("client called with unexpected request: " + request.toString()); - } - return null; - }); - - Map existingPipelines = registry.getIngestPipelineConfigs() - .stream() - .collect(Collectors.toMap(PipelineTemplateConfiguration::getId, PipelineTemplateConfiguration::getVersion)); - - ClusterChangedEvent event = createClusterChangedEvent(existingPipelines, nodes); - registry.clusterChanged(event); - } - - public void testThatVersionedOldPipelinesAreUpgraded() throws Exception { - DiscoveryNode node = TestDiscoveryNode.create("node"); - DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build(); - - Map existingPipelines = registry.getIngestPipelineConfigs() - .stream() - .collect(Collectors.toMap(PipelineTemplateConfiguration::getId, pipelineConfig -> pipelineConfig.getVersion() - 1)); - - ClusterChangedEvent event = createClusterChangedEvent(existingPipelines, nodes); - - AtomicInteger calledTimes = new AtomicInteger(0); - client.setVerifier((action, request, listener) -> verifyIngestPipelinesInstalled(calledTimes, action, request, listener)); - registry.clusterChanged(event); - assertBusy(() -> assertThat(calledTimes.get(), equalTo(registry.getIngestPipelineConfigs().size()))); - } - - public void testThatPipelinesAreNotInstalledWhenNoAnalyticsCollectionExist() { - DiscoveryNode node = TestDiscoveryNode.create("node"); - DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build(); - - ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyMap(), nodes, false); - - client.setVerifier((action, request, listener) -> { - if (action instanceof PutPipelineAction) { - fail("no behavioral analytics collection exists, pipeline should not be installed"); - } else { - fail("client called with unexpected request: " + request.toString()); - } - return null; - }); - - registry.clusterChanged(event); - } - - public void testThatNewerPipelinesAreNotUpgraded() throws Exception { - DiscoveryNode node = TestDiscoveryNode.create("node"); - DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build(); - - Map existingPipelines = registry.getIngestPipelineConfigs() - .stream() - .collect(Collectors.toMap(PipelineTemplateConfiguration::getId, pipelineConfig -> pipelineConfig.getVersion() + 1)); - - ClusterChangedEvent event = createClusterChangedEvent(existingPipelines, nodes); - - client.setVerifier((action, request, listener) -> { - if (action instanceof PutPipelineAction) { - fail("if the pipeline already exists it should not be re-put"); - } else { - fail("client called with unexpected request: " + request.toString()); - } - return null; - }); - - registry.clusterChanged(event); - } - - public void testThatMissingMasterNodeDoesNothing() { - DiscoveryNode localNode = TestDiscoveryNode.create("node"); - DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").add(localNode).build(); - - client.setVerifier((a, r, l) -> { - fail("if the master is missing nothing should happen"); - return null; - }); - - ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyMap(), nodes); - registry.clusterChanged(event); - } - - public void testThatNothingIsInstalledWhenAllNodesAreNotUpdated() { - DiscoveryNode updatedNode = TestDiscoveryNode.create("updatedNode"); - DiscoveryNode outdatedNode = TestDiscoveryNode.create("outdatedNode", ESTestCase.buildNewFakeTransportAddress(), Version.V_8_7_0); - DiscoveryNodes nodes = DiscoveryNodes.builder() - .localNodeId("updatedNode") - .masterNodeId("updatedNode") - .add(updatedNode) - .add(outdatedNode) - .build(); - - client.setVerifier((a, r, l) -> { - fail("if some cluster mode are not updated to at least v.8.8.0 nothing should happen"); - return null; - }); - - ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyMap(), nodes); - registry.clusterChanged(event); - } - - public static class VerifyingClient extends NoOpClient { - private TriFunction, ActionRequest, ActionListener, ActionResponse> verifier = (a, r, l) -> { - fail("verifier not set"); - return null; - }; - - VerifyingClient(ThreadPool threadPool) { - super(threadPool); - } - - @Override - @SuppressWarnings("unchecked") - protected void doExecute( - ActionType action, - Request request, - ActionListener listener - ) { - try { - listener.onResponse((Response) verifier.apply(action, request, listener)); - } catch (Exception e) { - listener.onFailure(e); - } - } - - public VerifyingClient setVerifier(TriFunction, ActionRequest, ActionListener, ActionResponse> verifier) { - this.verifier = verifier; - return this; - } - } - - private ActionResponse verifyIngestPipelinesInstalled( - AtomicInteger calledTimes, - ActionType action, - ActionRequest request, - ActionListener listener - ) { - if (action instanceof PutPipelineAction) { - calledTimes.incrementAndGet(); - assertThat(request, instanceOf(PutPipelineRequest.class)); - final PutPipelineRequest putRequest = (PutPipelineRequest) request; - assertThat(putRequest.getId(), equalTo(EVENT_DATA_STREAM_INGEST_PIPELINE_NAME)); - Map requestContentMap = XContentHelper.convertToMap(putRequest.getSource(), false, putRequest.getXContentType()) - .v2(); - assertThat(requestContentMap.get("version"), equalTo(REGISTRY_VERSION)); - assertNotNull(listener); - return AcknowledgedResponse.TRUE; - } else { - fail("client called with unexpected request:" + request.toString()); - return null; - } - } - - private ClusterChangedEvent createClusterChangedEvent(Map existingIngestPipelines, DiscoveryNodes nodes) { - return createClusterChangedEvent(existingIngestPipelines, nodes, true); - } - - private ClusterChangedEvent createClusterChangedEvent( - Map existingIngestPipelines, - DiscoveryNodes nodes, - boolean withDataStreams - ) { - ClusterState cs = createClusterState(existingIngestPipelines, nodes, withDataStreams); - ClusterChangedEvent realEvent = new ClusterChangedEvent( - "created-from-test", - cs, - ClusterState.builder(new ClusterName("test")).build() - ); - ClusterChangedEvent event = spy(realEvent); - when(event.localNodeMaster()).thenReturn(nodes.isLocalNodeElectedMaster()); - - return event; - } - - private ClusterState createClusterState(Map existingIngestPipelines, DiscoveryNodes nodes, boolean withDataStreams) { - Map pipelines = new HashMap<>(); - for (Map.Entry e : existingIngestPipelines.entrySet()) { - pipelines.put(e.getKey(), createMockPipelineConfiguration(e.getKey(), e.getValue())); - } - - Metadata.Builder metadataBuilder = Metadata.builder() - .transientSettings(Settings.EMPTY) - .putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelines)); - - if (withDataStreams) { - DataStream dataStream = createDataStream(); - metadataBuilder.dataStreams( - MapBuilder.newMapBuilder().put(dataStream.getName(), dataStream).map(), - Collections.emptyMap() - ); - } - - return ClusterState.builder(new ClusterName("test")) - .metadata(metadataBuilder) - .blocks(new ClusterBlocks.Builder().build()) - .nodes(nodes) - .build(); - } - - private DataStream createDataStream() { - return new DataStream( - AnalyticsConstants.EVENT_DATA_STREAM_INDEX_PREFIX + randomIdentifier(), - randomList(1, 10, () -> new Index(randomIdentifier(), randomIdentifier())), - 0, - Collections.emptyMap(), - false, - false, - false, - false, - IndexMode.STANDARD - ); - } - - private PipelineConfiguration createMockPipelineConfiguration(String pipelineId, int version) { - try (XContentBuilder configBuilder = JsonXContent.contentBuilder().startObject().field("version", version).endObject()) { - BytesReference config = BytesReference.bytes(configBuilder); - return new PipelineConfiguration(pipelineId, config, configBuilder.contentType()); - } catch (IOException e) { - return null; - } - } -} diff --git a/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/analytics/AnalyticsTemplateRegistryTests.java b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/analytics/AnalyticsTemplateRegistryTests.java index 77fdf1e58474b..661a95af50baf 100644 --- a/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/analytics/AnalyticsTemplateRegistryTests.java +++ b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/analytics/AnalyticsTemplateRegistryTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.ActionType; import org.elasticsearch.action.admin.indices.template.put.PutComponentTemplateAction; import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction; +import org.elasticsearch.action.ingest.PutPipelineAction; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; @@ -157,6 +158,10 @@ public void testThatNonExistingPoliciesAreAddedImmediately() throws Exception { AtomicInteger calledTimes = new AtomicInteger(0); client.setVerifier((action, request, listener) -> { + if (action instanceof PutPipelineAction) { + calledTimes.incrementAndGet(); + return AcknowledgedResponse.TRUE; + } if (action instanceof PutLifecycleAction) { calledTimes.incrementAndGet(); assertThat(action, instanceOf(PutLifecycleAction.class)); @@ -179,7 +184,7 @@ public void testThatNonExistingPoliciesAreAddedImmediately() throws Exception { ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyMap(), Collections.emptyMap(), nodes); registry.clusterChanged(event); - assertBusy(() -> assertThat(calledTimes.get(), equalTo(1))); + assertBusy(() -> assertThat(calledTimes.get(), equalTo(2))); } public void testPolicyAlreadyExists() { @@ -192,6 +197,10 @@ public void testPolicyAlreadyExists() { policies.forEach(p -> policyMap.put(p.getName(), p)); client.setVerifier((action, request, listener) -> { + if (action instanceof PutPipelineAction) { + // Ignore this, it's verified in another test + return AcknowledgedResponse.TRUE; + } if (action instanceof PutComponentTemplateAction) { // Ignore this, it's verified in another test return AcknowledgedResponse.TRUE; @@ -224,6 +233,10 @@ public void testPolicyAlreadyExistsButDiffers() throws IOException { policies.forEach(p -> policyMap.put(p.getName(), p)); client.setVerifier((action, request, listener) -> { + if (action instanceof PutPipelineAction) { + // Ignore this, it's verified in another test + return AcknowledgedResponse.TRUE; + } if (action instanceof PutComponentTemplateAction) { // Ignore this, it's verified in another test return AcknowledgedResponse.TRUE; @@ -312,6 +325,10 @@ public void testSameOrHigherVersionComponentTemplateNotUpgraded() { versions.put(AnalyticsTemplateRegistry.EVENT_DATA_STREAM_SETTINGS_COMPONENT_NAME, AnalyticsTemplateRegistry.REGISTRY_VERSION); ClusterChangedEvent sameVersionEvent = createClusterChangedEvent(Collections.emptyMap(), versions, nodes); client.setVerifier((action, request, listener) -> { + if (action instanceof PutPipelineAction) { + // Ignore this, it's verified in another test + return AcknowledgedResponse.TRUE; + } if (action instanceof PutComponentTemplateAction) { fail("template should not have been re-installed"); return null; @@ -358,6 +375,36 @@ public void testThatMissingMasterNodeDoesNothing() { registry.clusterChanged(event); } + public void testThatNonExistingPipelinesAreAddedImmediately() throws Exception { + DiscoveryNode node = TestDiscoveryNode.create("node"); + DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build(); + + AtomicInteger calledTimes = new AtomicInteger(0); + client.setVerifier((action, request, listener) -> { + if (action instanceof PutPipelineAction) { + calledTimes.incrementAndGet(); + return AcknowledgedResponse.TRUE; + } + if (action instanceof PutComponentTemplateAction) { + // Ignore this, it's verified in another test + return AcknowledgedResponse.TRUE; + } else if (action instanceof PutLifecycleAction) { + // Ignore this, it's verified in another test + return AcknowledgedResponse.TRUE; + } else if (action instanceof PutComposableIndexTemplateAction) { + // Ignore this, it's verified in another test + return AcknowledgedResponse.TRUE; + } else { + fail("client called with unexpected request: " + request.toString()); + } + return null; + }); + + ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyMap(), Collections.emptyMap(), nodes); + registry.clusterChanged(event); + assertBusy(() -> assertThat(calledTimes.get(), equalTo(registry.getIngestPipelines().size()))); + } + // ------------- /** @@ -400,6 +447,10 @@ private ActionResponse verifyComposableTemplateInstalled( ActionRequest request, ActionListener listener ) { + if (action instanceof PutPipelineAction) { + // Ignore this, it's verified in another test + return AcknowledgedResponse.TRUE; + } if (action instanceof PutComponentTemplateAction) { // Ignore this, it's verified in another test return AcknowledgedResponse.TRUE; @@ -429,6 +480,9 @@ private ActionResponse verifyComponentTemplateInstalled( ActionRequest request, ActionListener listener ) { + if (action instanceof PutPipelineAction) { + return AcknowledgedResponse.TRUE; + } if (action instanceof PutComponentTemplateAction) { calledTimes.incrementAndGet(); assertThat(action, instanceOf(PutComponentTemplateAction.class));