diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java index 1b314a4a2f3cb..ad80e2d116761 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java @@ -5,8 +5,24 @@ */ package org.elasticsearch.xpack.core.ml.job.persistence; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.AliasOrIndex; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.common.CheckedSupplier; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.Index; +import org.elasticsearch.plugins.MapperPlugin; import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DelayedDataCheckConfig; @@ -38,10 +54,16 @@ import org.elasticsearch.xpack.core.ml.notifications.AuditMessage; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.List; +import java.util.Map; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; /** * Static methods to create Elasticsearch index mappings for the autodetect @@ -107,6 +129,8 @@ public class ElasticsearchMappings { static final String RAW = "raw"; + private static final Logger logger = LogManager.getLogger(ElasticsearchMappings.class); + private ElasticsearchMappings() { } @@ -968,4 +992,94 @@ public static XContentBuilder auditMessageMapping() throws IOException { .endObject() .endObject(); } + + static String[] mappingRequiresUpdate(ClusterState state, String[] concreteIndices, Version minVersion) throws IOException { + List indicesToUpdate = new ArrayList<>(); + + ImmutableOpenMap> currentMapping = state.metaData().findMappings(concreteIndices, + new String[] {DOC_TYPE}, MapperPlugin.NOOP_FIELD_FILTER); + + for (String index : concreteIndices) { + ImmutableOpenMap innerMap = currentMapping.get(index); + if (innerMap != null) { + MappingMetaData metaData = innerMap.get(DOC_TYPE); + try { + @SuppressWarnings("unchecked") + Map meta = (Map) metaData.sourceAsMap().get("_meta"); + if (meta != null) { + String versionString = (String) meta.get("version"); + if (versionString == null) { + logger.info("Version of mappings for [{}] not found, recreating", index); + indicesToUpdate.add(index); + continue; + } + + Version mappingVersion = Version.fromString(versionString); + + if (mappingVersion.onOrAfter(minVersion)) { + continue; + } else { + logger.info("Mappings for [{}] are outdated [{}], updating it[{}].", index, mappingVersion, Version.CURRENT); + indicesToUpdate.add(index); + continue; + } + } else { + logger.info("Version of mappings for [{}] not found, recreating", index); + indicesToUpdate.add(index); + continue; + } + } catch (Exception e) { + logger.error(new ParameterizedMessage("Failed to retrieve mapping version for [{}], recreating", index), e); + indicesToUpdate.add(index); + continue; + } + } else { + logger.info("No mappings found for [{}], recreating", index); + indicesToUpdate.add(index); + } + } + return indicesToUpdate.toArray(new String[indicesToUpdate.size()]); + } + + public static void addDocMappingIfMissing(String alias, CheckedSupplier mappingSupplier, + Client client, ClusterState state, ActionListener listener) { + AliasOrIndex aliasOrIndex = state.metaData().getAliasAndIndexLookup().get(alias); + if (aliasOrIndex == null) { + // The index has never been created yet + listener.onResponse(true); + return; + } + String[] concreteIndices = aliasOrIndex.getIndices().stream().map(IndexMetaData::getIndex).map(Index::getName) + .toArray(String[]::new); + + String[] indicesThatRequireAnUpdate; + try { + indicesThatRequireAnUpdate = mappingRequiresUpdate(state, concreteIndices, Version.CURRENT); + } catch (IOException e) { + listener.onFailure(e); + return; + } + + if (indicesThatRequireAnUpdate.length > 0) { + try (XContentBuilder mapping = mappingSupplier.get()) { + PutMappingRequest putMappingRequest = new PutMappingRequest(indicesThatRequireAnUpdate); + putMappingRequest.type(DOC_TYPE); + putMappingRequest.source(mapping); + executeAsyncWithOrigin(client, ML_ORIGIN, PutMappingAction.INSTANCE, putMappingRequest, + ActionListener.wrap(response -> { + if (response.isAcknowledged()) { + listener.onResponse(true); + } else { + listener.onFailure(new ElasticsearchException("Attempt to put missing mapping in indices " + + Arrays.toString(indicesThatRequireAnUpdate) + " was not acknowledged")); + } + }, listener::onFailure)); + } catch (IOException e) { + listener.onFailure(e); + } + } else { + logger.trace("Mappings are up to date."); + listener.onResponse(true); + } + } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappingsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappingsTests.java index e4ce536a3ccf6..b6fec1cb28d1b 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappingsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappingsTests.java @@ -9,10 +9,18 @@ import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig; @@ -30,6 +38,8 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -128,6 +138,110 @@ public void testTermFieldMapping() throws IOException { assertNull(instanceMapping); } + + public void testMappingRequiresUpdateNoMapping() throws IOException { + ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); + ClusterState cs = csBuilder.build(); + String[] indices = new String[] { "no_index" }; + + assertArrayEquals(new String[] { "no_index" }, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT)); + } + + public void testMappingRequiresUpdateNullMapping() throws IOException { + ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("null_mapping", null)); + String[] indices = new String[] { "null_index" }; + assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT)); + } + + public void testMappingRequiresUpdateNoVersion() throws IOException { + ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("no_version_field", "NO_VERSION_FIELD")); + String[] indices = new String[] { "no_version_field" }; + assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT)); + } + + public void testMappingRequiresUpdateRecentMappingVersion() throws IOException { + ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_current", Version.CURRENT.toString())); + String[] indices = new String[] { "version_current" }; + assertArrayEquals(new String[] {}, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT)); + } + + public void testMappingRequiresUpdateMaliciousMappingVersion() throws IOException { + ClusterState cs = getClusterStateWithMappingsWithMetaData( + Collections.singletonMap("version_current", Collections.singletonMap("nested", "1.0"))); + String[] indices = new String[] { "version_nested" }; + assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT)); + } + + public void testMappingRequiresUpdateBogusMappingVersion() throws IOException { + ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_bogus", "0.0")); + String[] indices = new String[] { "version_bogus" }; + assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT)); + } + + public void testMappingRequiresUpdateNewerMappingVersion() throws IOException { + ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_newer", Version.CURRENT)); + String[] indices = new String[] { "version_newer" }; + assertArrayEquals(new String[] {}, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, VersionUtils.getPreviousVersion())); + } + + public void testMappingRequiresUpdateNewerMappingVersionMinor() throws IOException { + ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_newer_minor", Version.CURRENT)); + String[] indices = new String[] { "version_newer_minor" }; + assertArrayEquals(new String[] {}, + ElasticsearchMappings.mappingRequiresUpdate(cs, indices, VersionUtils.getPreviousMinorVersion())); + } + + public void testMappingRequiresUpdateSomeVersionMix() throws IOException { + Map versionMix = new HashMap<>(); + versionMix.put("version_54", Version.V_5_4_0); + versionMix.put("version_current", Version.CURRENT); + versionMix.put("version_null", null); + versionMix.put("version_current2", Version.CURRENT); + versionMix.put("version_bogus", "0.0.0"); + versionMix.put("version_current3", Version.CURRENT); + versionMix.put("version_bogus2", "0.0.0"); + + ClusterState cs = getClusterStateWithMappingsWithMetaData(versionMix); + String[] indices = new String[] { "version_54", "version_null", "version_bogus", "version_bogus2" }; + assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT)); + } + + private ClusterState getClusterStateWithMappingsWithMetaData(Map namesAndVersions) throws IOException { + MetaData.Builder metaDataBuilder = MetaData.builder(); + + for (Map.Entry entry : namesAndVersions.entrySet()) { + + String indexName = entry.getKey(); + Object version = entry.getValue(); + + IndexMetaData.Builder indexMetaData = IndexMetaData.builder(indexName); + indexMetaData.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)); + + Map mapping = new HashMap<>(); + Map properties = new HashMap<>(); + for (int i = 0; i < 10; i++) { + properties.put("field" + i, Collections.singletonMap("type", "string")); + } + mapping.put("properties", properties); + + Map meta = new HashMap<>(); + if (version != null && version.equals("NO_VERSION_FIELD") == false) { + meta.put("version", version); + } + mapping.put("_meta", meta); + + indexMetaData.putMapping(new MappingMetaData(ElasticsearchMappings.DOC_TYPE, mapping)); + + metaDataBuilder.put(indexMetaData); + } + MetaData metaData = metaDataBuilder.build(); + + ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); + csBuilder.metaData(metaData); + return csBuilder.build(); + } + private Set collectResultsDocFieldNames() throws IOException { // Only the mappings for the results index should be added below. Do NOT add mappings for other indexes here. return collectFieldNames(ElasticsearchMappings.resultsMapping()); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 3b8a7d97c25dd..c7096471023c3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -7,14 +7,11 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction; -import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; @@ -23,23 +20,16 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.AliasOrIndex; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.index.Index; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.persistent.AllocatedPersistentTask; @@ -47,7 +37,6 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.persistent.PersistentTasksService; -import org.elasticsearch.plugins.MapperPlugin; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; @@ -78,9 +67,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.process.MlMemoryTracker; -import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.LinkedList; import java.util.List; @@ -433,55 +420,6 @@ private static boolean jobHasRules(Job job) { return job.getAnalysisConfig().getDetectors().stream().anyMatch(d -> d.getRules().isEmpty() == false); } - public static String[] mappingRequiresUpdate(ClusterState state, String[] concreteIndices, Version minVersion, Logger logger) - throws IOException { - - List indicesToUpdate = new ArrayList<>(); - - ImmutableOpenMap> currentMapping = state.metaData().findMappings(concreteIndices, - new String[] { ElasticsearchMappings.DOC_TYPE }, MapperPlugin.NOOP_FIELD_FILTER); - - for (String index : concreteIndices) { - ImmutableOpenMap innerMap = currentMapping.get(index); - if (innerMap != null) { - MappingMetaData metaData = innerMap.get(ElasticsearchMappings.DOC_TYPE); - try { - Map meta = (Map) metaData.sourceAsMap().get("_meta"); - if (meta != null) { - String versionString = (String) meta.get("version"); - if (versionString == null) { - logger.info("Version of mappings for [{}] not found, recreating", index); - indicesToUpdate.add(index); - continue; - } - - Version mappingVersion = Version.fromString(versionString); - - if (mappingVersion.onOrAfter(minVersion)) { - continue; - } else { - logger.info("Mappings for [{}] are outdated [{}], updating it[{}].", index, mappingVersion, Version.CURRENT); - indicesToUpdate.add(index); - continue; - } - } else { - logger.info("Version of mappings for [{}] not found, recreating", index); - indicesToUpdate.add(index); - continue; - } - } catch (Exception e) { - logger.error(new ParameterizedMessage("Failed to retrieve mapping version for [{}], recreating", index), e); - indicesToUpdate.add(index); - continue; - } - } else { - logger.info("No mappings found for [{}], recreating", index); - indicesToUpdate.add(index); - } - } - return indicesToUpdate.toArray(new String[indicesToUpdate.size()]); - } - @Override protected String executor() { // This api doesn't do heavy or blocking operations (just delegates PersistentTasksService), @@ -613,25 +551,18 @@ public void onFailure(Exception e) { ); // Try adding state doc mapping - ActionListener resultsPutMappingHandler = ActionListener.wrap( + ActionListener getJobHandler = ActionListener.wrap( response -> { - addDocMappingIfMissing(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings::stateMapping, - state, missingMappingsListener); + ElasticsearchMappings.addDocMappingIfMissing(AnomalyDetectorsIndex.jobStateIndexName(), + ElasticsearchMappings::stateMapping, client, state, missingMappingsListener); }, listener::onFailure ); // Get the job config jobManager.getJob(jobParams.getJobId(), ActionListener.wrap( job -> { - try { - jobParams.setJob(job); - - // Try adding results doc mapping - addDocMappingIfMissing(AnomalyDetectorsIndex.jobResultsAliasedName(jobParams.getJobId()), - ElasticsearchMappings::resultsMapping, state, resultsPutMappingHandler); - } catch (Exception e) { - listener.onFailure(e); - } + jobParams.setJob(job); + getJobHandler.onResponse(null); }, listener::onFailure )); @@ -738,48 +669,6 @@ public void onFailure(Exception e) { ); } - private void addDocMappingIfMissing(String alias, CheckedSupplier mappingSupplier, ClusterState state, - ActionListener listener) { - AliasOrIndex aliasOrIndex = state.metaData().getAliasAndIndexLookup().get(alias); - if (aliasOrIndex == null) { - // The index has never been created yet - listener.onResponse(true); - return; - } - String[] concreteIndices = aliasOrIndex.getIndices().stream().map(IndexMetaData::getIndex).map(Index::getName) - .toArray(String[]::new); - - String[] indicesThatRequireAnUpdate; - try { - indicesThatRequireAnUpdate = mappingRequiresUpdate(state, concreteIndices, Version.CURRENT, logger); - } catch (IOException e) { - listener.onFailure(e); - return; - } - - if (indicesThatRequireAnUpdate.length > 0) { - try (XContentBuilder mapping = mappingSupplier.get()) { - PutMappingRequest putMappingRequest = new PutMappingRequest(indicesThatRequireAnUpdate); - putMappingRequest.type(ElasticsearchMappings.DOC_TYPE); - putMappingRequest.source(mapping); - executeAsyncWithOrigin(client, ML_ORIGIN, PutMappingAction.INSTANCE, putMappingRequest, - ActionListener.wrap(response -> { - if (response.isAcknowledged()) { - listener.onResponse(true); - } else { - listener.onFailure(new ElasticsearchException("Attempt to put missing mapping in indices " - + Arrays.toString(indicesThatRequireAnUpdate) + " was not acknowledged")); - } - }, listener::onFailure)); - } catch (IOException e) { - listener.onFailure(e); - } - } else { - logger.trace("Mappings are uptodate."); - listener.onResponse(true); - } - } - public static class OpenJobPersistentTasksExecutor extends PersistentTasksExecutor { private static final Logger logger = LogManager.getLogger(OpenJobPersistentTasksExecutor.class); @@ -798,6 +687,7 @@ public static class OpenJobPersistentTasksExecutor extends PersistentTasksExecut private volatile int maxConcurrentJobAllocations; private volatile int maxMachineMemoryPercent; private volatile int maxLazyMLNodes; + private volatile ClusterState clusterState; public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterService, AutodetectProcessManager autodetectProcessManager, MlMemoryTracker memoryTracker, @@ -815,6 +705,7 @@ public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterS clusterService.getClusterSettings() .addSettingsUpdateConsumer(MachineLearning.MAX_MACHINE_MEMORY_PERCENT, this::setMaxMachineMemoryPercent); clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_LAZY_ML_NODES, this::setMaxLazyMLNodes); + clusterService.addListener(event -> clusterState = event.state()); } @Override @@ -876,7 +767,7 @@ protected void nodeOperation(AllocatedPersistentTask task, OpenJobAction.JobPara } String jobId = jobTask.getJobId(); - autodetectProcessManager.openJob(jobTask, e2 -> { + autodetectProcessManager.openJob(jobTask, clusterState, e2 -> { if (e2 == null) { FinalizeJobExecutionAction.Request finalizeRequest = new FinalizeJobExecutionAction.Request(new String[]{jobId}); executeAsyncWithOrigin(client, ML_ORIGIN, FinalizeJobExecutionAction.INSTANCE, finalizeRequest, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 470efeebdf32a..cc6dcafaa1f20 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -9,6 +9,7 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; @@ -35,6 +36,8 @@ import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.ml.job.config.MlFilter; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; @@ -413,68 +416,75 @@ public void onFailure(Exception e) { } } - public void openJob(JobTask jobTask, Consumer closeHandler) { + public void openJob(JobTask jobTask, ClusterState clusterState, Consumer closeHandler) { String jobId = jobTask.getJobId(); logger.info("Opening job [{}]", jobId); - jobManager.getJob(jobId, ActionListener.wrap( - job -> { - if (job.getJobVersion() == null) { - closeHandler.accept(ExceptionsHelper.badRequestException("Cannot open job [" + jobId + // Start the process + ActionListener resultsMappingUpdateHandler = ActionListener.wrap( + r -> { + jobManager.getJob(jobId, ActionListener.wrap( + job -> { + if (job.getJobVersion() == null) { + closeHandler.accept(ExceptionsHelper.badRequestException("Cannot open job [" + jobId + "] because jobs created prior to version 5.5 are not supported")); - return; - } - - - processByAllocation.putIfAbsent(jobTask.getAllocationId(), new ProcessContext(jobTask)); - jobResultsProvider.getAutodetectParams(job, params -> { - // We need to fork, otherwise we restore model state from a network thread (several GET api calls): - threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - closeHandler.accept(e); - } + return; + } - @Override - protected void doRun() { - ProcessContext processContext = processByAllocation.get(jobTask.getAllocationId()); - if (processContext == null) { - logger.debug("Aborted opening job [{}] as it has been closed", jobId); - return; - } - if (processContext.getState() != ProcessContext.ProcessStateName.NOT_RUNNING) { - logger.debug("Cannot open job [{}] when its state is [{}]", - jobId, processContext.getState().getClass().getName()); - return; + processByAllocation.putIfAbsent(jobTask.getAllocationId(), new ProcessContext(jobTask)); + jobResultsProvider.getAutodetectParams(job, params -> { + // We need to fork, otherwise we restore model state from a network thread (several GET api calls): + threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + closeHandler.accept(e); } - try { - createProcessAndSetRunning(processContext, job, params, closeHandler); - processContext.getAutodetectCommunicator().init(params.modelSnapshot()); - setJobState(jobTask, JobState.OPENED); - } catch (Exception e1) { - // No need to log here as the persistent task framework will log it + @Override + protected void doRun() { + ProcessContext processContext = processByAllocation.get(jobTask.getAllocationId()); + if (processContext == null) { + logger.debug("Aborted opening job [{}] as it has been closed", jobId); + return; + } + if (processContext.getState() != ProcessContext.ProcessStateName.NOT_RUNNING) { + logger.debug("Cannot open job [{}] when its state is [{}]", + jobId, processContext.getState().getClass().getName()); + return; + } + try { - // Don't leave a partially initialised process hanging around - processContext.newKillBuilder() - .setAwaitCompletion(false) - .setFinish(false) - .kill(); - processByAllocation.remove(jobTask.getAllocationId()); - } finally { - setJobState(jobTask, JobState.FAILED, e2 -> closeHandler.accept(e1)); + createProcessAndSetRunning(processContext, job, params, closeHandler); + processContext.getAutodetectCommunicator().init(params.modelSnapshot()); + setJobState(jobTask, JobState.OPENED); + } catch (Exception e1) { + // No need to log here as the persistent task framework will log it + try { + // Don't leave a partially initialised process hanging around + processContext.newKillBuilder() + .setAwaitCompletion(false) + .setFinish(false) + .kill(); + processByAllocation.remove(jobTask.getAllocationId()); + } finally { + setJobState(jobTask, JobState.FAILED, e2 -> closeHandler.accept(e1)); + } } } - } + }); + }, e1 -> { + logger.warn("Failed to gather information required to open job [" + jobId + "]", e1); + setJobState(jobTask, JobState.FAILED, e2 -> closeHandler.accept(e1)); }); - }, e1 -> { - logger.warn("Failed to gather information required to open job [" + jobId + "]", e1); - setJobState(jobTask, JobState.FAILED, e2 -> closeHandler.accept(e1)); - }); - }, - closeHandler - )); - + }, + closeHandler + )); + }, + closeHandler); + + // Try adding the results doc mapping - this updates to the latest version if an old mapping is present + ElasticsearchMappings.addDocMappingIfMissing(AnomalyDetectorsIndex.jobResultsAliasedName(jobId), + ElasticsearchMappings::resultsMapping, client, clusterState, resultsMappingUpdateHandler); } private void createProcessAndSetRunning(ProcessContext processContext, Job job, AutodetectParams params, Consumer handler) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java index f6392c629f2ec..8f22673e27d47 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java @@ -11,7 +11,6 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -32,7 +31,6 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.VersionUtils; import org.elasticsearch.xpack.core.ml.MlMetaIndex; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; @@ -47,14 +45,12 @@ import org.elasticsearch.xpack.core.ml.job.config.RuleCondition; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields; -import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.notifications.AuditorField; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.process.MlMemoryTracker; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import org.junit.Before; -import java.io.IOException; import java.net.InetAddress; import java.util.ArrayList; import java.util.Collections; @@ -494,80 +490,6 @@ public void testVerifyIndicesPrimaryShardsAreActive() { assertEquals(indexToRemove, result.get(0)); } - public void testMappingRequiresUpdateNoMapping() throws IOException { - ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); - ClusterState cs = csBuilder.build(); - String[] indices = new String[] { "no_index" }; - - assertArrayEquals(new String[] { "no_index" }, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); - } - - public void testMappingRequiresUpdateNullMapping() throws IOException { - ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("null_mapping", null)); - String[] indices = new String[] { "null_index" }; - assertArrayEquals(indices, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); - } - - public void testMappingRequiresUpdateNoVersion() throws IOException { - ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("no_version_field", "NO_VERSION_FIELD")); - String[] indices = new String[] { "no_version_field" }; - assertArrayEquals(indices, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); - } - - public void testMappingRequiresUpdateRecentMappingVersion() throws IOException { - ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_current", Version.CURRENT.toString())); - String[] indices = new String[] { "version_current" }; - assertArrayEquals(new String[] {}, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); - } - - public void testMappingRequiresUpdateMaliciousMappingVersion() throws IOException { - ClusterState cs = getClusterStateWithMappingsWithMetaData( - Collections.singletonMap("version_current", Collections.singletonMap("nested", "1.0"))); - String[] indices = new String[] { "version_nested" }; - assertArrayEquals(indices, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); - } - - public void testMappingRequiresUpdateOldMappingVersion() throws IOException { - ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_54", Version.V_5_4_0.toString())); - String[] indices = new String[] { "version_54" }; - assertArrayEquals(indices, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); - } - - public void testMappingRequiresUpdateBogusMappingVersion() throws IOException { - ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_bogus", "0.0")); - String[] indices = new String[] { "version_bogus" }; - assertArrayEquals(indices, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); - } - - public void testMappingRequiresUpdateNewerMappingVersion() throws IOException { - ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_newer", Version.CURRENT)); - String[] indices = new String[] { "version_newer" }; - assertArrayEquals(new String[] {}, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, VersionUtils.getPreviousVersion(), - logger)); - } - - public void testMappingRequiresUpdateNewerMappingVersionMinor() throws IOException { - ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_newer_minor", Version.CURRENT)); - String[] indices = new String[] { "version_newer_minor" }; - assertArrayEquals(new String[] {}, - TransportOpenJobAction.mappingRequiresUpdate(cs, indices, VersionUtils.getPreviousMinorVersion(), logger)); - } - - public void testMappingRequiresUpdateSomeVersionMix() throws IOException { - Map versionMix = new HashMap<>(); - versionMix.put("version_54", Version.V_5_4_0); - versionMix.put("version_current", Version.CURRENT); - versionMix.put("version_null", null); - versionMix.put("version_current2", Version.CURRENT); - versionMix.put("version_bogus", "0.0.0"); - versionMix.put("version_current3", Version.CURRENT); - versionMix.put("version_bogus2", "0.0.0"); - - ClusterState cs = getClusterStateWithMappingsWithMetaData(versionMix); - String[] indices = new String[] { "version_54", "version_null", "version_bogus", "version_bogus2" }; - assertArrayEquals(indices, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); - } - public void testNodeNameAndVersion() { TransportAddress ta = new TransportAddress(InetAddress.getLoopbackAddress(), 9300); Map attributes = new HashMap<>(); @@ -652,42 +574,6 @@ private void addIndices(MetaData.Builder metaData, RoutingTable.Builder routingT } } - private ClusterState getClusterStateWithMappingsWithMetaData(Map namesAndVersions) throws IOException { - MetaData.Builder metaDataBuilder = MetaData.builder(); - - for (Map.Entry entry : namesAndVersions.entrySet()) { - - String indexName = entry.getKey(); - Object version = entry.getValue(); - - IndexMetaData.Builder indexMetaData = IndexMetaData.builder(indexName); - indexMetaData.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)); - - Map mapping = new HashMap<>(); - Map properties = new HashMap<>(); - for (int i = 0; i < 10; i++) { - properties.put("field" + i, Collections.singletonMap("type", "string")); - } - mapping.put("properties", properties); - - Map meta = new HashMap<>(); - if (version != null && version.equals("NO_VERSION_FIELD") == false) { - meta.put("version", version); - } - mapping.put("_meta", meta); - - indexMetaData.putMapping(new MappingMetaData(ElasticsearchMappings.DOC_TYPE, mapping)); - - metaDataBuilder.put(indexMetaData); - } - MetaData metaData = metaDataBuilder.build(); - - ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); - csBuilder.metaData(metaData); - return csBuilder.build(); - } - private static Job jobWithRules(String jobId) { DetectionRule rule = new DetectionRule.Builder(Collections.singletonList( new RuleCondition(RuleCondition.AppliesTo.TYPICAL, Operator.LT, 100.0) diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index 32f6d2fa88311..69c9ec3eaaa5c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -8,6 +8,9 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.AliasOrIndex; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; @@ -31,6 +34,7 @@ import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; @@ -61,6 +65,8 @@ import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -106,6 +112,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { private JobDataCountsPersister jobDataCountsPersister; private NormalizerFactory normalizerFactory; private Auditor auditor; + private ClusterState clusterState; private DataCounts dataCounts = new DataCounts("foo"); private ModelSizeStats modelSizeStats = new ModelSizeStats.Builder("foo").build(); @@ -125,6 +132,12 @@ public void setup() throws Exception { jobDataCountsPersister = mock(JobDataCountsPersister.class); normalizerFactory = mock(NormalizerFactory.class); auditor = mock(Auditor.class); + MetaData metaData = mock(MetaData.class); + SortedMap aliasOrIndexSortedMap = new TreeMap<>(); + aliasOrIndexSortedMap.put(AnomalyDetectorsIndex.jobStateIndexName(), mock(AliasOrIndex.Index.class)); + when(metaData.getAliasAndIndexLookup()).thenReturn(aliasOrIndexSortedMap); + clusterState = mock(ClusterState.class); + when(clusterState.metaData()).thenReturn(metaData); doAnswer(invocationOnMock -> { @SuppressWarnings("unchecked") @@ -186,7 +199,7 @@ public void testOpenJob() { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); when(jobTask.getAllocationId()).thenReturn(1L); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); assertEquals(1, manager.numberOfOpenJobs()); assertTrue(manager.jobHasActiveAutodetectProcess(jobTask)); verify(jobTask).updatePersistentTaskState(eq(new JobTaskState(JobState.OPENED, 1L)), any()); @@ -212,7 +225,7 @@ public void testOpenJob_withoutVersion() { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn(job.getId()); AtomicReference errorHolder = new AtomicReference<>(); - manager.openJob(jobTask, errorHolder::set); + manager.openJob(jobTask, clusterState, errorHolder::set); Exception error = errorHolder.get(); assertThat(error, is(notNullValue())); assertThat(error.getMessage(), equalTo("Cannot open job [no_version] because jobs created prior to version 5.5 are not supported")); @@ -258,22 +271,22 @@ public void testOpenJob_exceedMaxNumJobs() { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("bar"); when(jobTask.getAllocationId()).thenReturn(1L); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("baz"); when(jobTask.getAllocationId()).thenReturn(2L); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); assertEquals(3, manager.numberOfOpenJobs()); Exception[] holder = new Exception[1]; jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foobar"); when(jobTask.getAllocationId()).thenReturn(3L); - manager.openJob(jobTask, e -> holder[0] = e); + manager.openJob(jobTask, clusterState, e -> holder[0] = e); Exception e = holder[0]; assertEquals("max running job capacity [3] reached", e.getMessage()); @@ -282,7 +295,7 @@ public void testOpenJob_exceedMaxNumJobs() { when(jobTask.getJobId()).thenReturn("baz"); manager.closeJob(jobTask, false, null); assertEquals(2, manager.numberOfOpenJobs()); - manager.openJob(jobTask, e1 -> {}); + manager.openJob(jobTask, clusterState, e1 -> {}); assertEquals(3, manager.numberOfOpenJobs()); } @@ -294,7 +307,7 @@ public void testProcessData() { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); DataLoadParams params = new DataLoadParams(TimeRange.builder().build(), Optional.empty()); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()), params, (dataCounts1, e) -> {}); assertEquals(1, manager.numberOfOpenJobs()); @@ -317,7 +330,7 @@ public void testProcessDataThrowsElasticsearchStatusException_onIoException() { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); Exception[] holder = new Exception[1]; manager.processData(jobTask, analysisRegistry, inputStream, xContentType, params, (dataCounts1, e) -> holder[0] = e); assertNotNull(holder[0]); @@ -330,7 +343,7 @@ public void testCloseJob() { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts1, e) -> {}); @@ -358,7 +371,7 @@ public void testCanCloseClosingJob() throws Exception { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts1, e) -> {}); @@ -406,7 +419,7 @@ public void testCanKillClosingJob() throws Exception { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts1, e) -> {}); @@ -435,7 +448,7 @@ public void testBucketResetMessageIsSent() { InputStream inputStream = createInputStream(""); JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); manager.processData(jobTask, analysisRegistry, inputStream, xContentType, params, (dataCounts1, e) -> {}); verify(communicator).writeToJob(same(inputStream), same(analysisRegistry), same(xContentType), same(params), any()); } @@ -447,7 +460,7 @@ public void testFlush() { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); InputStream inputStream = createInputStream(""); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); manager.processData(jobTask, analysisRegistry, inputStream, randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts1, e) -> {}); @@ -487,7 +500,7 @@ public void testCloseThrows() { // create a jobtask JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts1, e) -> { }); @@ -527,7 +540,7 @@ public void testJobHasActiveAutodetectProcess() { when(jobTask.getJobId()).thenReturn("foo"); assertFalse(manager.jobHasActiveAutodetectProcess(jobTask)); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts1, e) -> {}); @@ -545,7 +558,7 @@ public void testKillKillsAutodetectProcess() throws IOException { when(jobTask.getJobId()).thenReturn("foo"); assertFalse(manager.jobHasActiveAutodetectProcess(jobTask)); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts1, e) -> {}); @@ -579,7 +592,7 @@ public void testProcessData_GivenStateNotOpened() { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); InputStream inputStream = createInputStream(""); DataCounts[] dataCounts = new DataCounts[1]; manager.processData(jobTask, analysisRegistry, inputStream, @@ -724,7 +737,7 @@ private AutodetectProcessManager createManagerAndCallProcessData(AutodetectCommu AutodetectProcessManager manager = createManager(communicator); JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn(jobId); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts, e) -> {}); return manager; diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMappingsUpgradeIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMappingsUpgradeIT.java new file mode 100644 index 0000000000000..465db087af283 --- /dev/null +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMappingsUpgradeIT.java @@ -0,0 +1,126 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.upgrades; + +import org.elasticsearch.Version; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ml.job.config.AnalysisConfig; +import org.elasticsearch.client.ml.job.config.DataDescription; +import org.elasticsearch.client.ml.job.config.Detector; +import org.elasticsearch.client.ml.job.config.Job; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; +import org.elasticsearch.xpack.test.rest.XPackRestTestHelper; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class MlMappingsUpgradeIT extends AbstractUpgradeTestCase { + + private static final String JOB_ID = "ml-mappings-upgrade-job"; + + @Override + protected Collection templatesToWaitFor() { + List templatesToWaitFor = XPackRestTestHelper.ML_POST_V660_TEMPLATES; + + // If upgrading from a version prior to v6.6.0 the set of templates + // to wait for is different + if (CLUSTER_TYPE == ClusterType.OLD) { + if (UPGRADED_FROM_VERSION.before(Version.V_6_6_0)) { + templatesToWaitFor = XPackRestTestHelper.ML_PRE_V660_TEMPLATES; + } + } + + return Stream.concat(templatesToWaitFor.stream(), + super.templatesToWaitFor().stream()).collect(Collectors.toSet()); + } + + /** + * The purpose of this test is to ensure that when a job is open through a rolling upgrade we upgrade the results + * index mappings when it is assigned to an upgraded node even if no other ML endpoint is called after the upgrade + */ + public void testMappingsUpgrade() throws Exception { + + switch (CLUSTER_TYPE) { + case OLD: + createAndOpenTestJob(); + break; + case MIXED: + // We don't know whether the job is on an old or upgraded node, so cannot assert that the mappings have been upgraded + break; + case UPGRADED: + assertUpgradedMappings(); + break; + default: + throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]"); + } + } + + private void createAndOpenTestJob() throws IOException { + + Detector.Builder d = new Detector.Builder("metric", "responsetime"); + d.setByFieldName("airline"); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build())); + analysisConfig.setBucketSpan(TimeValue.timeValueMinutes(10)); + Job.Builder job = new Job.Builder(JOB_ID); + job.setAnalysisConfig(analysisConfig); + job.setDataDescription(new DataDescription.Builder()); + // Use a custom index because other rolling upgrade tests meddle with the shared index + job.setResultsIndexName("mappings-upgrade-test"); + + Request putJob = new Request("PUT", "_xpack/ml/anomaly_detectors/" + JOB_ID); + putJob.setJsonEntity(Strings.toString(job.build())); + Response response = client().performRequest(putJob); + assertEquals(200, response.getStatusLine().getStatusCode()); + + Request openJob = new Request("POST", "_xpack/ml/anomaly_detectors/" + JOB_ID + "/_open"); + response = client().performRequest(openJob); + assertEquals(200, response.getStatusLine().getStatusCode()); + } + + @SuppressWarnings("unchecked") + private void assertUpgradedMappings() throws Exception { + + assertBusy(() -> { + Request getMappings = new Request("GET", AnomalyDetectorsIndex.resultsWriteAlias(JOB_ID) + "/_mappings"); + Response response = client().performRequest(getMappings); + + Map responseLevel = entityAsMap(response); + assertNotNull(responseLevel); + Map indexLevel = null; + // The name of the concrete index underlying the results index alias may or may not have been changed + // by the upgrade process (depending on what other tests are being run and the order they're run in), + // so navigating to the next level of the tree must account for both cases + for (Map.Entry entry : responseLevel.entrySet()) { + if (entry.getKey().startsWith(".ml-anomalies-") && entry.getKey().contains("mappings-upgrade-test")) { + indexLevel = (Map) entry.getValue(); + break; + } + } + assertNotNull(indexLevel); + Map mappingsLevel = (Map) indexLevel.get("mappings"); + assertNotNull(mappingsLevel); + Map typeLevel = (Map) mappingsLevel.get(ElasticsearchMappings.DOC_TYPE); + assertNotNull(typeLevel); + Map metaLevel = (Map) typeLevel.get("_meta"); + assertEquals(Collections.singletonMap("version", Version.CURRENT.toString()), metaLevel); + Map propertiesLevel = (Map) typeLevel.get("properties"); + assertNotNull(propertiesLevel); + // TODO: as the years go by, the field we assert on here should be changed + // to the most recent field we've added that is NOT of type "keyword" + Map fieldLevel = (Map) propertiesLevel.get("multi_bucket_impact"); + assertEquals(Collections.singletonMap("type", "double"), fieldLevel); + }); + } +}