From 9eb59a527f2deb9672d881531fc8d1565a25d326 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Mon, 21 Jan 2019 16:17:18 +0000 Subject: [PATCH] [ML] Update ML results mappings on process start This change moves the update to the results index mappings from the open job action to the code that starts the autodetect process. When a rolling upgrade is performed we need to update the mappings for already-open jobs that are reassigned from an old version node to a new version node, but the open job action is not called in this case. Closes #37607 --- .../persistence/ElasticsearchMappings.java | 112 ++++++++++++++++ .../ElasticsearchMappingsTests.java | 115 ++++++++++++++++ .../ml/action/TransportOpenJobAction.java | 125 ++---------------- .../autodetect/AutodetectProcessManager.java | 112 +++++++++------- .../action/TransportOpenJobActionTests.java | 114 ---------------- .../AutodetectProcessManagerTests.java | 51 ++++--- .../upgrades/MlMappingsUpgradeIT.java | 112 ++++++++++++++++ 7 files changed, 440 insertions(+), 301 deletions(-) create mode 100644 x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMappingsUpgradeIT.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java index 1b314a4a2f3cb..20035a3ce5dd7 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java @@ -5,8 +5,23 @@ */ package org.elasticsearch.xpack.core.ml.job.persistence; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.AliasOrIndex; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.common.CheckedSupplier; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.Index; +import org.elasticsearch.plugins.MapperPlugin; import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DelayedDataCheckConfig; @@ -38,10 +53,16 @@ import org.elasticsearch.xpack.core.ml.notifications.AuditMessage; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.List; +import java.util.Map; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; /** * Static methods to create Elasticsearch index mappings for the autodetect @@ -968,4 +989,95 @@ public static XContentBuilder auditMessageMapping() throws IOException { .endObject() .endObject(); } + + static String[] mappingRequiresUpdate(ClusterState state, String[] concreteIndices, Version minVersion, + Logger logger) throws IOException { + List indicesToUpdate = new ArrayList<>(); + + ImmutableOpenMap> currentMapping = state.metaData().findMappings(concreteIndices, + new String[] {DOC_TYPE}, MapperPlugin.NOOP_FIELD_FILTER); + + for (String index : concreteIndices) { + ImmutableOpenMap innerMap = currentMapping.get(index); + if (innerMap != null) { + MappingMetaData metaData = innerMap.get(DOC_TYPE); + try { + @SuppressWarnings("unchecked") + Map meta = (Map) metaData.sourceAsMap().get("_meta"); + if (meta != null) { + String versionString = (String) meta.get("version"); + if (versionString == null) { + logger.info("Version of mappings for [{}] not found, recreating", index); + indicesToUpdate.add(index); + continue; + } + + Version mappingVersion = Version.fromString(versionString); + + if (mappingVersion.onOrAfter(minVersion)) { + continue; + } else { + logger.info("Mappings for [{}] are outdated [{}], updating it[{}].", index, mappingVersion, Version.CURRENT); + indicesToUpdate.add(index); + continue; + } + } else { + logger.info("Version of mappings for [{}] not found, recreating", index); + indicesToUpdate.add(index); + continue; + } + } catch (Exception e) { + logger.error(new ParameterizedMessage("Failed to retrieve mapping version for [{}], recreating", index), e); + indicesToUpdate.add(index); + continue; + } + } else { + logger.info("No mappings found for [{}], recreating", index); + indicesToUpdate.add(index); + } + } + return indicesToUpdate.toArray(new String[indicesToUpdate.size()]); + } + + public static void addDocMappingIfMissing(String alias, CheckedSupplier mappingSupplier, + Client client, Logger logger, ClusterState state, ActionListener listener) { + AliasOrIndex aliasOrIndex = state.metaData().getAliasAndIndexLookup().get(alias); + if (aliasOrIndex == null) { + // The index has never been created yet + listener.onResponse(true); + return; + } + String[] concreteIndices = aliasOrIndex.getIndices().stream().map(IndexMetaData::getIndex).map(Index::getName) + .toArray(String[]::new); + + String[] indicesThatRequireAnUpdate; + try { + indicesThatRequireAnUpdate = mappingRequiresUpdate(state, concreteIndices, Version.CURRENT, logger); + } catch (IOException e) { + listener.onFailure(e); + return; + } + + if (indicesThatRequireAnUpdate.length > 0) { + try (XContentBuilder mapping = mappingSupplier.get()) { + PutMappingRequest putMappingRequest = new PutMappingRequest(indicesThatRequireAnUpdate); + putMappingRequest.type(DOC_TYPE); + putMappingRequest.source(mapping); + executeAsyncWithOrigin(client, ML_ORIGIN, PutMappingAction.INSTANCE, putMappingRequest, + ActionListener.wrap(response -> { + if (response.isAcknowledged()) { + listener.onResponse(true); + } else { + listener.onFailure(new ElasticsearchException("Attempt to put missing mapping in indices " + + Arrays.toString(indicesThatRequireAnUpdate) + " was not acknowledged")); + } + }, listener::onFailure)); + } catch (IOException e) { + listener.onFailure(e); + } + } else { + logger.trace("Mappings are up to date."); + listener.onResponse(true); + } + } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappingsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappingsTests.java index e4ce536a3ccf6..d15529eb52efd 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappingsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappingsTests.java @@ -9,10 +9,18 @@ import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig; @@ -30,6 +38,8 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -128,6 +138,111 @@ public void testTermFieldMapping() throws IOException { assertNull(instanceMapping); } + + public void testMappingRequiresUpdateNoMapping() throws IOException { + ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); + ClusterState cs = csBuilder.build(); + String[] indices = new String[] { "no_index" }; + + assertArrayEquals(new String[] { "no_index" }, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); + } + + public void testMappingRequiresUpdateNullMapping() throws IOException { + ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("null_mapping", null)); + String[] indices = new String[] { "null_index" }; + assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); + } + + public void testMappingRequiresUpdateNoVersion() throws IOException { + ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("no_version_field", "NO_VERSION_FIELD")); + String[] indices = new String[] { "no_version_field" }; + assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); + } + + public void testMappingRequiresUpdateRecentMappingVersion() throws IOException { + ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_current", Version.CURRENT.toString())); + String[] indices = new String[] { "version_current" }; + assertArrayEquals(new String[] {}, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); + } + + public void testMappingRequiresUpdateMaliciousMappingVersion() throws IOException { + ClusterState cs = getClusterStateWithMappingsWithMetaData( + Collections.singletonMap("version_current", Collections.singletonMap("nested", "1.0"))); + String[] indices = new String[] { "version_nested" }; + assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); + } + + public void testMappingRequiresUpdateBogusMappingVersion() throws IOException { + ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_bogus", "0.0")); + String[] indices = new String[] { "version_bogus" }; + assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); + } + + public void testMappingRequiresUpdateNewerMappingVersion() throws IOException { + ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_newer", Version.CURRENT)); + String[] indices = new String[] { "version_newer" }; + assertArrayEquals(new String[] {}, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, VersionUtils.getPreviousVersion(), + logger)); + } + + public void testMappingRequiresUpdateNewerMappingVersionMinor() throws IOException { + ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_newer_minor", Version.CURRENT)); + String[] indices = new String[] { "version_newer_minor" }; + assertArrayEquals(new String[] {}, + ElasticsearchMappings.mappingRequiresUpdate(cs, indices, VersionUtils.getPreviousMinorVersion(), logger)); + } + + public void testMappingRequiresUpdateSomeVersionMix() throws IOException { + Map versionMix = new HashMap<>(); + versionMix.put("version_54", Version.V_5_4_0); + versionMix.put("version_current", Version.CURRENT); + versionMix.put("version_null", null); + versionMix.put("version_current2", Version.CURRENT); + versionMix.put("version_bogus", "0.0.0"); + versionMix.put("version_current3", Version.CURRENT); + versionMix.put("version_bogus2", "0.0.0"); + + ClusterState cs = getClusterStateWithMappingsWithMetaData(versionMix); + String[] indices = new String[] { "version_54", "version_null", "version_bogus", "version_bogus2" }; + assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); + } + + private ClusterState getClusterStateWithMappingsWithMetaData(Map namesAndVersions) throws IOException { + MetaData.Builder metaDataBuilder = MetaData.builder(); + + for (Map.Entry entry : namesAndVersions.entrySet()) { + + String indexName = entry.getKey(); + Object version = entry.getValue(); + + IndexMetaData.Builder indexMetaData = IndexMetaData.builder(indexName); + indexMetaData.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)); + + Map mapping = new HashMap<>(); + Map properties = new HashMap<>(); + for (int i = 0; i < 10; i++) { + properties.put("field" + i, Collections.singletonMap("type", "string")); + } + mapping.put("properties", properties); + + Map meta = new HashMap<>(); + if (version != null && version.equals("NO_VERSION_FIELD") == false) { + meta.put("version", version); + } + mapping.put("_meta", meta); + + indexMetaData.putMapping(new MappingMetaData(ElasticsearchMappings.DOC_TYPE, mapping)); + + metaDataBuilder.put(indexMetaData); + } + MetaData metaData = metaDataBuilder.build(); + + ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); + csBuilder.metaData(metaData); + return csBuilder.build(); + } + private Set collectResultsDocFieldNames() throws IOException { // Only the mappings for the results index should be added below. Do NOT add mappings for other indexes here. return collectFieldNames(ElasticsearchMappings.resultsMapping()); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 3b8a7d97c25dd..f8ac255f63ac1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -7,14 +7,11 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction; -import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; @@ -23,23 +20,16 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.AliasOrIndex; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.index.Index; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.persistent.AllocatedPersistentTask; @@ -47,7 +37,6 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.persistent.PersistentTasksService; -import org.elasticsearch.plugins.MapperPlugin; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; @@ -78,9 +67,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.process.MlMemoryTracker; -import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.LinkedList; import java.util.List; @@ -433,55 +420,6 @@ private static boolean jobHasRules(Job job) { return job.getAnalysisConfig().getDetectors().stream().anyMatch(d -> d.getRules().isEmpty() == false); } - public static String[] mappingRequiresUpdate(ClusterState state, String[] concreteIndices, Version minVersion, Logger logger) - throws IOException { - - List indicesToUpdate = new ArrayList<>(); - - ImmutableOpenMap> currentMapping = state.metaData().findMappings(concreteIndices, - new String[] { ElasticsearchMappings.DOC_TYPE }, MapperPlugin.NOOP_FIELD_FILTER); - - for (String index : concreteIndices) { - ImmutableOpenMap innerMap = currentMapping.get(index); - if (innerMap != null) { - MappingMetaData metaData = innerMap.get(ElasticsearchMappings.DOC_TYPE); - try { - Map meta = (Map) metaData.sourceAsMap().get("_meta"); - if (meta != null) { - String versionString = (String) meta.get("version"); - if (versionString == null) { - logger.info("Version of mappings for [{}] not found, recreating", index); - indicesToUpdate.add(index); - continue; - } - - Version mappingVersion = Version.fromString(versionString); - - if (mappingVersion.onOrAfter(minVersion)) { - continue; - } else { - logger.info("Mappings for [{}] are outdated [{}], updating it[{}].", index, mappingVersion, Version.CURRENT); - indicesToUpdate.add(index); - continue; - } - } else { - logger.info("Version of mappings for [{}] not found, recreating", index); - indicesToUpdate.add(index); - continue; - } - } catch (Exception e) { - logger.error(new ParameterizedMessage("Failed to retrieve mapping version for [{}], recreating", index), e); - indicesToUpdate.add(index); - continue; - } - } else { - logger.info("No mappings found for [{}], recreating", index); - indicesToUpdate.add(index); - } - } - return indicesToUpdate.toArray(new String[indicesToUpdate.size()]); - } - @Override protected String executor() { // This api doesn't do heavy or blocking operations (just delegates PersistentTasksService), @@ -613,25 +551,18 @@ public void onFailure(Exception e) { ); // Try adding state doc mapping - ActionListener resultsPutMappingHandler = ActionListener.wrap( + ActionListener getJobHandler = ActionListener.wrap( response -> { - addDocMappingIfMissing(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings::stateMapping, - state, missingMappingsListener); + ElasticsearchMappings.addDocMappingIfMissing(AnomalyDetectorsIndex.jobStateIndexName(), + ElasticsearchMappings::stateMapping, client, logger, state, missingMappingsListener); }, listener::onFailure ); // Get the job config jobManager.getJob(jobParams.getJobId(), ActionListener.wrap( job -> { - try { - jobParams.setJob(job); - - // Try adding results doc mapping - addDocMappingIfMissing(AnomalyDetectorsIndex.jobResultsAliasedName(jobParams.getJobId()), - ElasticsearchMappings::resultsMapping, state, resultsPutMappingHandler); - } catch (Exception e) { - listener.onFailure(e); - } + jobParams.setJob(job); + getJobHandler.onResponse(null); }, listener::onFailure )); @@ -738,48 +669,6 @@ public void onFailure(Exception e) { ); } - private void addDocMappingIfMissing(String alias, CheckedSupplier mappingSupplier, ClusterState state, - ActionListener listener) { - AliasOrIndex aliasOrIndex = state.metaData().getAliasAndIndexLookup().get(alias); - if (aliasOrIndex == null) { - // The index has never been created yet - listener.onResponse(true); - return; - } - String[] concreteIndices = aliasOrIndex.getIndices().stream().map(IndexMetaData::getIndex).map(Index::getName) - .toArray(String[]::new); - - String[] indicesThatRequireAnUpdate; - try { - indicesThatRequireAnUpdate = mappingRequiresUpdate(state, concreteIndices, Version.CURRENT, logger); - } catch (IOException e) { - listener.onFailure(e); - return; - } - - if (indicesThatRequireAnUpdate.length > 0) { - try (XContentBuilder mapping = mappingSupplier.get()) { - PutMappingRequest putMappingRequest = new PutMappingRequest(indicesThatRequireAnUpdate); - putMappingRequest.type(ElasticsearchMappings.DOC_TYPE); - putMappingRequest.source(mapping); - executeAsyncWithOrigin(client, ML_ORIGIN, PutMappingAction.INSTANCE, putMappingRequest, - ActionListener.wrap(response -> { - if (response.isAcknowledged()) { - listener.onResponse(true); - } else { - listener.onFailure(new ElasticsearchException("Attempt to put missing mapping in indices " - + Arrays.toString(indicesThatRequireAnUpdate) + " was not acknowledged")); - } - }, listener::onFailure)); - } catch (IOException e) { - listener.onFailure(e); - } - } else { - logger.trace("Mappings are uptodate."); - listener.onResponse(true); - } - } - public static class OpenJobPersistentTasksExecutor extends PersistentTasksExecutor { private static final Logger logger = LogManager.getLogger(OpenJobPersistentTasksExecutor.class); @@ -798,6 +687,7 @@ public static class OpenJobPersistentTasksExecutor extends PersistentTasksExecut private volatile int maxConcurrentJobAllocations; private volatile int maxMachineMemoryPercent; private volatile int maxLazyMLNodes; + private volatile ClusterState clusterState; public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterService, AutodetectProcessManager autodetectProcessManager, MlMemoryTracker memoryTracker, @@ -815,6 +705,7 @@ public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterS clusterService.getClusterSettings() .addSettingsUpdateConsumer(MachineLearning.MAX_MACHINE_MEMORY_PERCENT, this::setMaxMachineMemoryPercent); clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_LAZY_ML_NODES, this::setMaxLazyMLNodes); + clusterService.addListener(event -> clusterState = event.state()); } @Override @@ -876,7 +767,7 @@ protected void nodeOperation(AllocatedPersistentTask task, OpenJobAction.JobPara } String jobId = jobTask.getJobId(); - autodetectProcessManager.openJob(jobTask, e2 -> { + autodetectProcessManager.openJob(jobTask, clusterState, e2 -> { if (e2 == null) { FinalizeJobExecutionAction.Request finalizeRequest = new FinalizeJobExecutionAction.Request(new String[]{jobId}); executeAsyncWithOrigin(client, ML_ORIGIN, FinalizeJobExecutionAction.INSTANCE, finalizeRequest, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 470efeebdf32a..0f0babb7848c5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -9,6 +9,7 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; @@ -35,6 +36,8 @@ import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.ml.job.config.MlFilter; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; @@ -413,68 +416,75 @@ public void onFailure(Exception e) { } } - public void openJob(JobTask jobTask, Consumer closeHandler) { + public void openJob(JobTask jobTask, ClusterState clusterState, Consumer closeHandler) { String jobId = jobTask.getJobId(); logger.info("Opening job [{}]", jobId); - jobManager.getJob(jobId, ActionListener.wrap( - job -> { - if (job.getJobVersion() == null) { - closeHandler.accept(ExceptionsHelper.badRequestException("Cannot open job [" + jobId + // Start the process + ActionListener resultsMappingUpdateHandler = ActionListener.wrap( + r -> { + jobManager.getJob(jobId, ActionListener.wrap( + job -> { + if (job.getJobVersion() == null) { + closeHandler.accept(ExceptionsHelper.badRequestException("Cannot open job [" + jobId + "] because jobs created prior to version 5.5 are not supported")); - return; - } - - - processByAllocation.putIfAbsent(jobTask.getAllocationId(), new ProcessContext(jobTask)); - jobResultsProvider.getAutodetectParams(job, params -> { - // We need to fork, otherwise we restore model state from a network thread (several GET api calls): - threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - closeHandler.accept(e); - } + return; + } - @Override - protected void doRun() { - ProcessContext processContext = processByAllocation.get(jobTask.getAllocationId()); - if (processContext == null) { - logger.debug("Aborted opening job [{}] as it has been closed", jobId); - return; - } - if (processContext.getState() != ProcessContext.ProcessStateName.NOT_RUNNING) { - logger.debug("Cannot open job [{}] when its state is [{}]", - jobId, processContext.getState().getClass().getName()); - return; + processByAllocation.putIfAbsent(jobTask.getAllocationId(), new ProcessContext(jobTask)); + jobResultsProvider.getAutodetectParams(job, params -> { + // We need to fork, otherwise we restore model state from a network thread (several GET api calls): + threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + closeHandler.accept(e); } - try { - createProcessAndSetRunning(processContext, job, params, closeHandler); - processContext.getAutodetectCommunicator().init(params.modelSnapshot()); - setJobState(jobTask, JobState.OPENED); - } catch (Exception e1) { - // No need to log here as the persistent task framework will log it + @Override + protected void doRun() { + ProcessContext processContext = processByAllocation.get(jobTask.getAllocationId()); + if (processContext == null) { + logger.debug("Aborted opening job [{}] as it has been closed", jobId); + return; + } + if (processContext.getState() != ProcessContext.ProcessStateName.NOT_RUNNING) { + logger.debug("Cannot open job [{}] when its state is [{}]", + jobId, processContext.getState().getClass().getName()); + return; + } + try { - // Don't leave a partially initialised process hanging around - processContext.newKillBuilder() - .setAwaitCompletion(false) - .setFinish(false) - .kill(); - processByAllocation.remove(jobTask.getAllocationId()); - } finally { - setJobState(jobTask, JobState.FAILED, e2 -> closeHandler.accept(e1)); + createProcessAndSetRunning(processContext, job, params, closeHandler); + processContext.getAutodetectCommunicator().init(params.modelSnapshot()); + setJobState(jobTask, JobState.OPENED); + } catch (Exception e1) { + // No need to log here as the persistent task framework will log it + try { + // Don't leave a partially initialised process hanging around + processContext.newKillBuilder() + .setAwaitCompletion(false) + .setFinish(false) + .kill(); + processByAllocation.remove(jobTask.getAllocationId()); + } finally { + setJobState(jobTask, JobState.FAILED, e2 -> closeHandler.accept(e1)); + } } } - } + }); + }, e1 -> { + logger.warn("Failed to gather information required to open job [" + jobId + "]", e1); + setJobState(jobTask, JobState.FAILED, e2 -> closeHandler.accept(e1)); }); - }, e1 -> { - logger.warn("Failed to gather information required to open job [" + jobId + "]", e1); - setJobState(jobTask, JobState.FAILED, e2 -> closeHandler.accept(e1)); - }); - }, - closeHandler - )); - + }, + closeHandler + )); + }, + closeHandler); + + // Try adding the results doc mapping - this updates to the latest version if an old mapping is present + ElasticsearchMappings.addDocMappingIfMissing(AnomalyDetectorsIndex.jobResultsAliasedName(jobId), + ElasticsearchMappings::resultsMapping, client, logger, clusterState, resultsMappingUpdateHandler); } private void createProcessAndSetRunning(ProcessContext processContext, Job job, AutodetectParams params, Consumer handler) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java index f6392c629f2ec..8f22673e27d47 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java @@ -11,7 +11,6 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -32,7 +31,6 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.VersionUtils; import org.elasticsearch.xpack.core.ml.MlMetaIndex; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; @@ -47,14 +45,12 @@ import org.elasticsearch.xpack.core.ml.job.config.RuleCondition; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields; -import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.notifications.AuditorField; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.process.MlMemoryTracker; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import org.junit.Before; -import java.io.IOException; import java.net.InetAddress; import java.util.ArrayList; import java.util.Collections; @@ -494,80 +490,6 @@ public void testVerifyIndicesPrimaryShardsAreActive() { assertEquals(indexToRemove, result.get(0)); } - public void testMappingRequiresUpdateNoMapping() throws IOException { - ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); - ClusterState cs = csBuilder.build(); - String[] indices = new String[] { "no_index" }; - - assertArrayEquals(new String[] { "no_index" }, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); - } - - public void testMappingRequiresUpdateNullMapping() throws IOException { - ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("null_mapping", null)); - String[] indices = new String[] { "null_index" }; - assertArrayEquals(indices, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); - } - - public void testMappingRequiresUpdateNoVersion() throws IOException { - ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("no_version_field", "NO_VERSION_FIELD")); - String[] indices = new String[] { "no_version_field" }; - assertArrayEquals(indices, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); - } - - public void testMappingRequiresUpdateRecentMappingVersion() throws IOException { - ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_current", Version.CURRENT.toString())); - String[] indices = new String[] { "version_current" }; - assertArrayEquals(new String[] {}, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); - } - - public void testMappingRequiresUpdateMaliciousMappingVersion() throws IOException { - ClusterState cs = getClusterStateWithMappingsWithMetaData( - Collections.singletonMap("version_current", Collections.singletonMap("nested", "1.0"))); - String[] indices = new String[] { "version_nested" }; - assertArrayEquals(indices, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); - } - - public void testMappingRequiresUpdateOldMappingVersion() throws IOException { - ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_54", Version.V_5_4_0.toString())); - String[] indices = new String[] { "version_54" }; - assertArrayEquals(indices, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); - } - - public void testMappingRequiresUpdateBogusMappingVersion() throws IOException { - ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_bogus", "0.0")); - String[] indices = new String[] { "version_bogus" }; - assertArrayEquals(indices, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); - } - - public void testMappingRequiresUpdateNewerMappingVersion() throws IOException { - ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_newer", Version.CURRENT)); - String[] indices = new String[] { "version_newer" }; - assertArrayEquals(new String[] {}, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, VersionUtils.getPreviousVersion(), - logger)); - } - - public void testMappingRequiresUpdateNewerMappingVersionMinor() throws IOException { - ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_newer_minor", Version.CURRENT)); - String[] indices = new String[] { "version_newer_minor" }; - assertArrayEquals(new String[] {}, - TransportOpenJobAction.mappingRequiresUpdate(cs, indices, VersionUtils.getPreviousMinorVersion(), logger)); - } - - public void testMappingRequiresUpdateSomeVersionMix() throws IOException { - Map versionMix = new HashMap<>(); - versionMix.put("version_54", Version.V_5_4_0); - versionMix.put("version_current", Version.CURRENT); - versionMix.put("version_null", null); - versionMix.put("version_current2", Version.CURRENT); - versionMix.put("version_bogus", "0.0.0"); - versionMix.put("version_current3", Version.CURRENT); - versionMix.put("version_bogus2", "0.0.0"); - - ClusterState cs = getClusterStateWithMappingsWithMetaData(versionMix); - String[] indices = new String[] { "version_54", "version_null", "version_bogus", "version_bogus2" }; - assertArrayEquals(indices, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); - } - public void testNodeNameAndVersion() { TransportAddress ta = new TransportAddress(InetAddress.getLoopbackAddress(), 9300); Map attributes = new HashMap<>(); @@ -652,42 +574,6 @@ private void addIndices(MetaData.Builder metaData, RoutingTable.Builder routingT } } - private ClusterState getClusterStateWithMappingsWithMetaData(Map namesAndVersions) throws IOException { - MetaData.Builder metaDataBuilder = MetaData.builder(); - - for (Map.Entry entry : namesAndVersions.entrySet()) { - - String indexName = entry.getKey(); - Object version = entry.getValue(); - - IndexMetaData.Builder indexMetaData = IndexMetaData.builder(indexName); - indexMetaData.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)); - - Map mapping = new HashMap<>(); - Map properties = new HashMap<>(); - for (int i = 0; i < 10; i++) { - properties.put("field" + i, Collections.singletonMap("type", "string")); - } - mapping.put("properties", properties); - - Map meta = new HashMap<>(); - if (version != null && version.equals("NO_VERSION_FIELD") == false) { - meta.put("version", version); - } - mapping.put("_meta", meta); - - indexMetaData.putMapping(new MappingMetaData(ElasticsearchMappings.DOC_TYPE, mapping)); - - metaDataBuilder.put(indexMetaData); - } - MetaData metaData = metaDataBuilder.build(); - - ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); - csBuilder.metaData(metaData); - return csBuilder.build(); - } - private static Job jobWithRules(String jobId) { DetectionRule rule = new DetectionRule.Builder(Collections.singletonList( new RuleCondition(RuleCondition.AppliesTo.TYPICAL, Operator.LT, 100.0) diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index 32f6d2fa88311..69c9ec3eaaa5c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -8,6 +8,9 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.AliasOrIndex; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; @@ -31,6 +34,7 @@ import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; @@ -61,6 +65,8 @@ import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -106,6 +112,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { private JobDataCountsPersister jobDataCountsPersister; private NormalizerFactory normalizerFactory; private Auditor auditor; + private ClusterState clusterState; private DataCounts dataCounts = new DataCounts("foo"); private ModelSizeStats modelSizeStats = new ModelSizeStats.Builder("foo").build(); @@ -125,6 +132,12 @@ public void setup() throws Exception { jobDataCountsPersister = mock(JobDataCountsPersister.class); normalizerFactory = mock(NormalizerFactory.class); auditor = mock(Auditor.class); + MetaData metaData = mock(MetaData.class); + SortedMap aliasOrIndexSortedMap = new TreeMap<>(); + aliasOrIndexSortedMap.put(AnomalyDetectorsIndex.jobStateIndexName(), mock(AliasOrIndex.Index.class)); + when(metaData.getAliasAndIndexLookup()).thenReturn(aliasOrIndexSortedMap); + clusterState = mock(ClusterState.class); + when(clusterState.metaData()).thenReturn(metaData); doAnswer(invocationOnMock -> { @SuppressWarnings("unchecked") @@ -186,7 +199,7 @@ public void testOpenJob() { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); when(jobTask.getAllocationId()).thenReturn(1L); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); assertEquals(1, manager.numberOfOpenJobs()); assertTrue(manager.jobHasActiveAutodetectProcess(jobTask)); verify(jobTask).updatePersistentTaskState(eq(new JobTaskState(JobState.OPENED, 1L)), any()); @@ -212,7 +225,7 @@ public void testOpenJob_withoutVersion() { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn(job.getId()); AtomicReference errorHolder = new AtomicReference<>(); - manager.openJob(jobTask, errorHolder::set); + manager.openJob(jobTask, clusterState, errorHolder::set); Exception error = errorHolder.get(); assertThat(error, is(notNullValue())); assertThat(error.getMessage(), equalTo("Cannot open job [no_version] because jobs created prior to version 5.5 are not supported")); @@ -258,22 +271,22 @@ public void testOpenJob_exceedMaxNumJobs() { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("bar"); when(jobTask.getAllocationId()).thenReturn(1L); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("baz"); when(jobTask.getAllocationId()).thenReturn(2L); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); assertEquals(3, manager.numberOfOpenJobs()); Exception[] holder = new Exception[1]; jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foobar"); when(jobTask.getAllocationId()).thenReturn(3L); - manager.openJob(jobTask, e -> holder[0] = e); + manager.openJob(jobTask, clusterState, e -> holder[0] = e); Exception e = holder[0]; assertEquals("max running job capacity [3] reached", e.getMessage()); @@ -282,7 +295,7 @@ public void testOpenJob_exceedMaxNumJobs() { when(jobTask.getJobId()).thenReturn("baz"); manager.closeJob(jobTask, false, null); assertEquals(2, manager.numberOfOpenJobs()); - manager.openJob(jobTask, e1 -> {}); + manager.openJob(jobTask, clusterState, e1 -> {}); assertEquals(3, manager.numberOfOpenJobs()); } @@ -294,7 +307,7 @@ public void testProcessData() { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); DataLoadParams params = new DataLoadParams(TimeRange.builder().build(), Optional.empty()); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()), params, (dataCounts1, e) -> {}); assertEquals(1, manager.numberOfOpenJobs()); @@ -317,7 +330,7 @@ public void testProcessDataThrowsElasticsearchStatusException_onIoException() { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); Exception[] holder = new Exception[1]; manager.processData(jobTask, analysisRegistry, inputStream, xContentType, params, (dataCounts1, e) -> holder[0] = e); assertNotNull(holder[0]); @@ -330,7 +343,7 @@ public void testCloseJob() { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts1, e) -> {}); @@ -358,7 +371,7 @@ public void testCanCloseClosingJob() throws Exception { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts1, e) -> {}); @@ -406,7 +419,7 @@ public void testCanKillClosingJob() throws Exception { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts1, e) -> {}); @@ -435,7 +448,7 @@ public void testBucketResetMessageIsSent() { InputStream inputStream = createInputStream(""); JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); manager.processData(jobTask, analysisRegistry, inputStream, xContentType, params, (dataCounts1, e) -> {}); verify(communicator).writeToJob(same(inputStream), same(analysisRegistry), same(xContentType), same(params), any()); } @@ -447,7 +460,7 @@ public void testFlush() { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); InputStream inputStream = createInputStream(""); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); manager.processData(jobTask, analysisRegistry, inputStream, randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts1, e) -> {}); @@ -487,7 +500,7 @@ public void testCloseThrows() { // create a jobtask JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts1, e) -> { }); @@ -527,7 +540,7 @@ public void testJobHasActiveAutodetectProcess() { when(jobTask.getJobId()).thenReturn("foo"); assertFalse(manager.jobHasActiveAutodetectProcess(jobTask)); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts1, e) -> {}); @@ -545,7 +558,7 @@ public void testKillKillsAutodetectProcess() throws IOException { when(jobTask.getJobId()).thenReturn("foo"); assertFalse(manager.jobHasActiveAutodetectProcess(jobTask)); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts1, e) -> {}); @@ -579,7 +592,7 @@ public void testProcessData_GivenStateNotOpened() { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); InputStream inputStream = createInputStream(""); DataCounts[] dataCounts = new DataCounts[1]; manager.processData(jobTask, analysisRegistry, inputStream, @@ -724,7 +737,7 @@ private AutodetectProcessManager createManagerAndCallProcessData(AutodetectCommu AutodetectProcessManager manager = createManager(communicator); JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn(jobId); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts, e) -> {}); return manager; diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMappingsUpgradeIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMappingsUpgradeIT.java new file mode 100644 index 0000000000000..5143a57903d87 --- /dev/null +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMappingsUpgradeIT.java @@ -0,0 +1,112 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.upgrades; + +import org.elasticsearch.Version; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ml.job.config.AnalysisConfig; +import org.elasticsearch.client.ml.job.config.DataDescription; +import org.elasticsearch.client.ml.job.config.Detector; +import org.elasticsearch.client.ml.job.config.Job; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.test.rest.XPackRestTestHelper; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class MlMappingsUpgradeIT extends AbstractUpgradeTestCase { + + private static final String JOB_ID = "ml-mappings-upgrade-job"; + + @Override + protected Collection templatesToWaitFor() { + List templatesToWaitFor = XPackRestTestHelper.ML_POST_V660_TEMPLATES; + + // If upgrading from a version prior to v6.6.0 the set of templates + // to wait for is different + if (CLUSTER_TYPE == ClusterType.OLD) { + if (UPGRADED_FROM_VERSION.before(Version.V_6_6_0)) { + templatesToWaitFor = XPackRestTestHelper.ML_PRE_V660_TEMPLATES; + } + } + + return Stream.concat(templatesToWaitFor.stream(), + super.templatesToWaitFor().stream()).collect(Collectors.toSet()); + } + + /** + * The purpose of this test is to ensure that when a job is open through a rolling upgrade we upgrade the results + * index mappings when it is assigned to an upgraded node even if no other ML endpoint is called after the upgrade + */ + public void testMappingsUpgrade() throws Exception { + + switch (CLUSTER_TYPE) { + case OLD: + createAndOpenTestJob(); + break; + case MIXED: + // We don't know whether the job is on an old or upgraded node, so cannot assert that the mappings have been upgraded + break; + case UPGRADED: + assertUpgradedMappings(); + break; + default: + throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]"); + } + } + + private void createAndOpenTestJob() throws IOException { + + Detector.Builder d = new Detector.Builder("metric", "responsetime"); + d.setByFieldName("airline"); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build())); + analysisConfig.setBucketSpan(TimeValue.timeValueMinutes(10)); + Job.Builder job = new Job.Builder(JOB_ID); + job.setAnalysisConfig(analysisConfig); + job.setDataDescription(new DataDescription.Builder()); + + Request putJob = new Request("PUT", "_xpack/ml/anomaly_detectors/" + JOB_ID); + putJob.setJsonEntity(Strings.toString(job.build())); + Response response = client().performRequest(putJob); + assertEquals(200, response.getStatusLine().getStatusCode()); + + Request openJob = new Request("POST", "_xpack/ml/anomaly_detectors/" + JOB_ID + "/_open"); + response = client().performRequest(openJob); + assertEquals(200, response.getStatusLine().getStatusCode()); + } + + @SuppressWarnings("unchecked") + private void assertUpgradedMappings() throws Exception { + + assertBusy(() -> { + Request getMappings = new Request("GET", AnomalyDetectorsIndex.resultsWriteAlias(JOB_ID) + "/_mappings"); + Response response = client().performRequest(getMappings); + + Map responseLevel = entityAsMap(response); + assertNotNull(responseLevel); + Map indexLevel = (Map) responseLevel.get(".ml-anomalies-shared"); + assertNotNull(indexLevel); + Map mappingsLevel = (Map) indexLevel.get("mappings"); + assertNotNull(mappingsLevel); + Map metaLevel = (Map) mappingsLevel.get("_meta"); + assertEquals(Collections.singletonMap("version", Version.CURRENT.toString()), metaLevel); + Map propertiesLevel = (Map) mappingsLevel.get("properties"); + assertNotNull(propertiesLevel); + // TODO: as the years go by, the field we assert on here should be changed + // to the most recent field we've added that is NOT of type "keyword" + Map fieldLevel = (Map) propertiesLevel.get("multi_bucket_impact"); + assertEquals(Collections.singletonMap("type", "double"), fieldLevel); + }); + } +}