diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java index 1b314a4a2f3cb..ad80e2d116761 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java @@ -5,8 +5,24 @@ */ package org.elasticsearch.xpack.core.ml.job.persistence; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.AliasOrIndex; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.common.CheckedSupplier; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.Index; +import org.elasticsearch.plugins.MapperPlugin; import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DelayedDataCheckConfig; @@ -38,10 +54,16 @@ import org.elasticsearch.xpack.core.ml.notifications.AuditMessage; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.List; +import java.util.Map; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; /** * Static methods to create Elasticsearch index mappings for the autodetect @@ -107,6 +129,8 @@ public class ElasticsearchMappings { static final String RAW = "raw"; + private static final Logger logger = LogManager.getLogger(ElasticsearchMappings.class); + private ElasticsearchMappings() { } @@ -968,4 +992,94 @@ public static XContentBuilder auditMessageMapping() throws IOException { .endObject() .endObject(); } + + static String[] mappingRequiresUpdate(ClusterState state, String[] concreteIndices, Version minVersion) throws IOException { + List indicesToUpdate = new ArrayList<>(); + + ImmutableOpenMap> currentMapping = state.metaData().findMappings(concreteIndices, + new String[] {DOC_TYPE}, MapperPlugin.NOOP_FIELD_FILTER); + + for (String index : concreteIndices) { + ImmutableOpenMap innerMap = currentMapping.get(index); + if (innerMap != null) { + MappingMetaData metaData = innerMap.get(DOC_TYPE); + try { + @SuppressWarnings("unchecked") + Map meta = (Map) metaData.sourceAsMap().get("_meta"); + if (meta != null) { + String versionString = (String) meta.get("version"); + if (versionString == null) { + logger.info("Version of mappings for [{}] not found, recreating", index); + indicesToUpdate.add(index); + continue; + } + + Version mappingVersion = Version.fromString(versionString); + + if (mappingVersion.onOrAfter(minVersion)) { + continue; + } else { + logger.info("Mappings for [{}] are outdated [{}], updating it[{}].", index, mappingVersion, Version.CURRENT); + indicesToUpdate.add(index); + continue; + } + } else { + logger.info("Version of mappings for [{}] not found, recreating", index); + indicesToUpdate.add(index); + continue; + } + } catch (Exception e) { + logger.error(new ParameterizedMessage("Failed to retrieve mapping version for [{}], recreating", index), e); + indicesToUpdate.add(index); + continue; + } + } else { + logger.info("No mappings found for [{}], recreating", index); + indicesToUpdate.add(index); + } + } + return indicesToUpdate.toArray(new String[indicesToUpdate.size()]); + } + + public static void addDocMappingIfMissing(String alias, CheckedSupplier mappingSupplier, + Client client, ClusterState state, ActionListener listener) { + AliasOrIndex aliasOrIndex = state.metaData().getAliasAndIndexLookup().get(alias); + if (aliasOrIndex == null) { + // The index has never been created yet + listener.onResponse(true); + return; + } + String[] concreteIndices = aliasOrIndex.getIndices().stream().map(IndexMetaData::getIndex).map(Index::getName) + .toArray(String[]::new); + + String[] indicesThatRequireAnUpdate; + try { + indicesThatRequireAnUpdate = mappingRequiresUpdate(state, concreteIndices, Version.CURRENT); + } catch (IOException e) { + listener.onFailure(e); + return; + } + + if (indicesThatRequireAnUpdate.length > 0) { + try (XContentBuilder mapping = mappingSupplier.get()) { + PutMappingRequest putMappingRequest = new PutMappingRequest(indicesThatRequireAnUpdate); + putMappingRequest.type(DOC_TYPE); + putMappingRequest.source(mapping); + executeAsyncWithOrigin(client, ML_ORIGIN, PutMappingAction.INSTANCE, putMappingRequest, + ActionListener.wrap(response -> { + if (response.isAcknowledged()) { + listener.onResponse(true); + } else { + listener.onFailure(new ElasticsearchException("Attempt to put missing mapping in indices " + + Arrays.toString(indicesThatRequireAnUpdate) + " was not acknowledged")); + } + }, listener::onFailure)); + } catch (IOException e) { + listener.onFailure(e); + } + } else { + logger.trace("Mappings are up to date."); + listener.onResponse(true); + } + } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappingsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappingsTests.java index e4ce536a3ccf6..b6fec1cb28d1b 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappingsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappingsTests.java @@ -9,10 +9,18 @@ import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig; @@ -30,6 +38,8 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -128,6 +138,110 @@ public void testTermFieldMapping() throws IOException { assertNull(instanceMapping); } + + public void testMappingRequiresUpdateNoMapping() throws IOException { + ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); + ClusterState cs = csBuilder.build(); + String[] indices = new String[] { "no_index" }; + + assertArrayEquals(new String[] { "no_index" }, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT)); + } + + public void testMappingRequiresUpdateNullMapping() throws IOException { + ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("null_mapping", null)); + String[] indices = new String[] { "null_index" }; + assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT)); + } + + public void testMappingRequiresUpdateNoVersion() throws IOException { + ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("no_version_field", "NO_VERSION_FIELD")); + String[] indices = new String[] { "no_version_field" }; + assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT)); + } + + public void testMappingRequiresUpdateRecentMappingVersion() throws IOException { + ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_current", Version.CURRENT.toString())); + String[] indices = new String[] { "version_current" }; + assertArrayEquals(new String[] {}, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT)); + } + + public void testMappingRequiresUpdateMaliciousMappingVersion() throws IOException { + ClusterState cs = getClusterStateWithMappingsWithMetaData( + Collections.singletonMap("version_current", Collections.singletonMap("nested", "1.0"))); + String[] indices = new String[] { "version_nested" }; + assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT)); + } + + public void testMappingRequiresUpdateBogusMappingVersion() throws IOException { + ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_bogus", "0.0")); + String[] indices = new String[] { "version_bogus" }; + assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT)); + } + + public void testMappingRequiresUpdateNewerMappingVersion() throws IOException { + ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_newer", Version.CURRENT)); + String[] indices = new String[] { "version_newer" }; + assertArrayEquals(new String[] {}, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, VersionUtils.getPreviousVersion())); + } + + public void testMappingRequiresUpdateNewerMappingVersionMinor() throws IOException { + ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_newer_minor", Version.CURRENT)); + String[] indices = new String[] { "version_newer_minor" }; + assertArrayEquals(new String[] {}, + ElasticsearchMappings.mappingRequiresUpdate(cs, indices, VersionUtils.getPreviousMinorVersion())); + } + + public void testMappingRequiresUpdateSomeVersionMix() throws IOException { + Map versionMix = new HashMap<>(); + versionMix.put("version_54", Version.V_5_4_0); + versionMix.put("version_current", Version.CURRENT); + versionMix.put("version_null", null); + versionMix.put("version_current2", Version.CURRENT); + versionMix.put("version_bogus", "0.0.0"); + versionMix.put("version_current3", Version.CURRENT); + versionMix.put("version_bogus2", "0.0.0"); + + ClusterState cs = getClusterStateWithMappingsWithMetaData(versionMix); + String[] indices = new String[] { "version_54", "version_null", "version_bogus", "version_bogus2" }; + assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT)); + } + + private ClusterState getClusterStateWithMappingsWithMetaData(Map namesAndVersions) throws IOException { + MetaData.Builder metaDataBuilder = MetaData.builder(); + + for (Map.Entry entry : namesAndVersions.entrySet()) { + + String indexName = entry.getKey(); + Object version = entry.getValue(); + + IndexMetaData.Builder indexMetaData = IndexMetaData.builder(indexName); + indexMetaData.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)); + + Map mapping = new HashMap<>(); + Map properties = new HashMap<>(); + for (int i = 0; i < 10; i++) { + properties.put("field" + i, Collections.singletonMap("type", "string")); + } + mapping.put("properties", properties); + + Map meta = new HashMap<>(); + if (version != null && version.equals("NO_VERSION_FIELD") == false) { + meta.put("version", version); + } + mapping.put("_meta", meta); + + indexMetaData.putMapping(new MappingMetaData(ElasticsearchMappings.DOC_TYPE, mapping)); + + metaDataBuilder.put(indexMetaData); + } + MetaData metaData = metaDataBuilder.build(); + + ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); + csBuilder.metaData(metaData); + return csBuilder.build(); + } + private Set collectResultsDocFieldNames() throws IOException { // Only the mappings for the results index should be added below. Do NOT add mappings for other indexes here. return collectFieldNames(ElasticsearchMappings.resultsMapping()); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 4d3e6657dcab7..11b01bfff5100 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -7,14 +7,11 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction; -import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -24,22 +21,15 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.AliasOrIndex; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.index.Index; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.persistent.AllocatedPersistentTask; @@ -47,7 +37,6 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.persistent.PersistentTasksService; -import org.elasticsearch.plugins.MapperPlugin; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; @@ -78,9 +67,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.process.MlMemoryTracker; -import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.LinkedList; import java.util.List; @@ -432,55 +419,6 @@ private static boolean jobHasRules(Job job) { return job.getAnalysisConfig().getDetectors().stream().anyMatch(d -> d.getRules().isEmpty() == false); } - public static String[] mappingRequiresUpdate(ClusterState state, String[] concreteIndices, Version minVersion, Logger logger) - throws IOException { - - List indicesToUpdate = new ArrayList<>(); - - ImmutableOpenMap> currentMapping = state.metaData().findMappings(concreteIndices, - new String[] { ElasticsearchMappings.DOC_TYPE }, MapperPlugin.NOOP_FIELD_FILTER); - - for (String index : concreteIndices) { - ImmutableOpenMap innerMap = currentMapping.get(index); - if (innerMap != null) { - MappingMetaData metaData = innerMap.get(ElasticsearchMappings.DOC_TYPE); - try { - Map meta = (Map) metaData.sourceAsMap().get("_meta"); - if (meta != null) { - String versionString = (String) meta.get("version"); - if (versionString == null) { - logger.info("Version of mappings for [{}] not found, recreating", index); - indicesToUpdate.add(index); - continue; - } - - Version mappingVersion = Version.fromString(versionString); - - if (mappingVersion.onOrAfter(minVersion)) { - continue; - } else { - logger.info("Mappings for [{}] are outdated [{}], updating it[{}].", index, mappingVersion, Version.CURRENT); - indicesToUpdate.add(index); - continue; - } - } else { - logger.info("Version of mappings for [{}] not found, recreating", index); - indicesToUpdate.add(index); - continue; - } - } catch (Exception e) { - logger.error(new ParameterizedMessage("Failed to retrieve mapping version for [{}], recreating", index), e); - indicesToUpdate.add(index); - continue; - } - } else { - logger.info("No mappings found for [{}], recreating", index); - indicesToUpdate.add(index); - } - } - return indicesToUpdate.toArray(new String[indicesToUpdate.size()]); - } - @Override protected String executor() { // This api doesn't do heavy or blocking operations (just delegates PersistentTasksService), @@ -612,25 +550,18 @@ public void onFailure(Exception e) { ); // Try adding state doc mapping - ActionListener resultsPutMappingHandler = ActionListener.wrap( + ActionListener getJobHandler = ActionListener.wrap( response -> { - addDocMappingIfMissing(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), ElasticsearchMappings::stateMapping, - state, missingMappingsListener); + ElasticsearchMappings.addDocMappingIfMissing(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), + ElasticsearchMappings::stateMapping, client, state, missingMappingsListener); }, listener::onFailure ); // Get the job config jobManager.getJob(jobParams.getJobId(), ActionListener.wrap( job -> { - try { - jobParams.setJob(job); - - // Try adding results doc mapping - addDocMappingIfMissing(AnomalyDetectorsIndex.jobResultsAliasedName(jobParams.getJobId()), - ElasticsearchMappings::resultsMapping, state, resultsPutMappingHandler); - } catch (Exception e) { - listener.onFailure(e); - } + jobParams.setJob(job); + getJobHandler.onResponse(null); }, listener::onFailure )); @@ -737,48 +668,6 @@ public void onFailure(Exception e) { ); } - private void addDocMappingIfMissing(String alias, CheckedSupplier mappingSupplier, ClusterState state, - ActionListener listener) { - AliasOrIndex aliasOrIndex = state.metaData().getAliasAndIndexLookup().get(alias); - if (aliasOrIndex == null) { - // The index has never been created yet - listener.onResponse(true); - return; - } - String[] concreteIndices = aliasOrIndex.getIndices().stream().map(IndexMetaData::getIndex).map(Index::getName) - .toArray(String[]::new); - - String[] indicesThatRequireAnUpdate; - try { - indicesThatRequireAnUpdate = mappingRequiresUpdate(state, concreteIndices, Version.CURRENT, logger); - } catch (IOException e) { - listener.onFailure(e); - return; - } - - if (indicesThatRequireAnUpdate.length > 0) { - try (XContentBuilder mapping = mappingSupplier.get()) { - PutMappingRequest putMappingRequest = new PutMappingRequest(indicesThatRequireAnUpdate); - putMappingRequest.type(ElasticsearchMappings.DOC_TYPE); - putMappingRequest.source(mapping); - executeAsyncWithOrigin(client, ML_ORIGIN, PutMappingAction.INSTANCE, putMappingRequest, - ActionListener.wrap(response -> { - if (response.isAcknowledged()) { - listener.onResponse(true); - } else { - listener.onFailure(new ElasticsearchException("Attempt to put missing mapping in indices " - + Arrays.toString(indicesThatRequireAnUpdate) + " was not acknowledged")); - } - }, listener::onFailure)); - } catch (IOException e) { - listener.onFailure(e); - } - } else { - logger.trace("Mappings are uptodate."); - listener.onResponse(true); - } - } - public static class OpenJobPersistentTasksExecutor extends PersistentTasksExecutor { private static final Logger logger = LogManager.getLogger(OpenJobPersistentTasksExecutor.class); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 65fea4cbc8394..f91748ca90973 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -40,6 +40,7 @@ import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; @@ -421,7 +422,9 @@ public void onFailure(Exception e) { public void openJob(JobTask jobTask, ClusterState clusterState, Consumer closeHandler) { String jobId = jobTask.getJobId(); logger.info("Opening job [{}]", jobId); - AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterState, ActionListener.wrap( + + // Start the process + ActionListener stateAliasHandler = ActionListener.wrap( r -> { jobManager.getJob(jobId, ActionListener.wrap( job -> { @@ -431,7 +434,6 @@ public void openJob(JobTask jobTask, ClusterState clusterState, Consumer { // We need to fork, otherwise we restore model state from a network thread (several GET api calls): @@ -481,7 +483,17 @@ protected void doRun() { closeHandler )); }, - closeHandler)); + closeHandler); + + // Make sure the state index and alias exist + ActionListener resultsMappingUpdateHandler = ActionListener.wrap( + ack -> AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterState, stateAliasHandler), + closeHandler + ); + + // Try adding the results doc mapping - this updates to the latest version if an old mapping is present + ElasticsearchMappings.addDocMappingIfMissing(AnomalyDetectorsIndex.jobResultsAliasedName(jobId), + ElasticsearchMappings::resultsMapping, client, clusterState, resultsMappingUpdateHandler); } private void createProcessAndSetRunning(ProcessContext processContext, Job job, AutodetectParams params, Consumer handler) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java index f74c2fe30efa5..17a73840fc4e0 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java @@ -15,7 +15,6 @@ import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -38,7 +37,6 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.VersionUtils; import org.elasticsearch.xpack.core.ml.MlMetaIndex; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; @@ -53,7 +51,6 @@ import org.elasticsearch.xpack.core.ml.job.config.RuleCondition; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields; -import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.notifications.AuditorField; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; @@ -61,7 +58,6 @@ import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import org.junit.Before; -import java.io.IOException; import java.net.InetAddress; import java.util.ArrayList; import java.util.Arrays; @@ -507,80 +503,6 @@ public void testVerifyIndicesPrimaryShardsAreActive() { assertEquals(indexToRemove, result.get(0)); } - public void testMappingRequiresUpdateNoMapping() throws IOException { - ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); - ClusterState cs = csBuilder.build(); - String[] indices = new String[] { "no_index" }; - - assertArrayEquals(new String[] { "no_index" }, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); - } - - public void testMappingRequiresUpdateNullMapping() throws IOException { - ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("null_mapping", null)); - String[] indices = new String[] { "null_index" }; - assertArrayEquals(indices, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); - } - - public void testMappingRequiresUpdateNoVersion() throws IOException { - ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("no_version_field", "NO_VERSION_FIELD")); - String[] indices = new String[] { "no_version_field" }; - assertArrayEquals(indices, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); - } - - public void testMappingRequiresUpdateRecentMappingVersion() throws IOException { - ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_current", Version.CURRENT.toString())); - String[] indices = new String[] { "version_current" }; - assertArrayEquals(new String[] {}, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); - } - - public void testMappingRequiresUpdateMaliciousMappingVersion() throws IOException { - ClusterState cs = getClusterStateWithMappingsWithMetaData( - Collections.singletonMap("version_current", Collections.singletonMap("nested", "1.0"))); - String[] indices = new String[] { "version_nested" }; - assertArrayEquals(indices, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); - } - - public void testMappingRequiresUpdateOldMappingVersion() throws IOException { - ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_54", Version.V_5_4_0.toString())); - String[] indices = new String[] { "version_54" }; - assertArrayEquals(indices, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); - } - - public void testMappingRequiresUpdateBogusMappingVersion() throws IOException { - ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_bogus", "0.0")); - String[] indices = new String[] { "version_bogus" }; - assertArrayEquals(indices, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); - } - - public void testMappingRequiresUpdateNewerMappingVersion() throws IOException { - ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_newer", Version.CURRENT)); - String[] indices = new String[] { "version_newer" }; - assertArrayEquals(new String[] {}, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, VersionUtils.getPreviousVersion(), - logger)); - } - - public void testMappingRequiresUpdateNewerMappingVersionMinor() throws IOException { - ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_newer_minor", Version.CURRENT)); - String[] indices = new String[] { "version_newer_minor" }; - assertArrayEquals(new String[] {}, - TransportOpenJobAction.mappingRequiresUpdate(cs, indices, VersionUtils.getPreviousMinorVersion(), logger)); - } - - public void testMappingRequiresUpdateSomeVersionMix() throws IOException { - Map versionMix = new HashMap<>(); - versionMix.put("version_54", Version.V_5_4_0); - versionMix.put("version_current", Version.CURRENT); - versionMix.put("version_null", null); - versionMix.put("version_current2", Version.CURRENT); - versionMix.put("version_bogus", "0.0.0"); - versionMix.put("version_current3", Version.CURRENT); - versionMix.put("version_bogus2", "0.0.0"); - - ClusterState cs = getClusterStateWithMappingsWithMetaData(versionMix); - String[] indices = new String[] { "version_54", "version_null", "version_bogus", "version_bogus2" }; - assertArrayEquals(indices, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); - } - public void testNodeNameAndVersion() { TransportAddress ta = new TransportAddress(InetAddress.getLoopbackAddress(), 9300); Map attributes = new HashMap<>(); @@ -683,42 +605,6 @@ private void addIndices(MetaData.Builder metaData, RoutingTable.Builder routingT } } - private ClusterState getClusterStateWithMappingsWithMetaData(Map namesAndVersions) throws IOException { - MetaData.Builder metaDataBuilder = MetaData.builder(); - - for (Map.Entry entry : namesAndVersions.entrySet()) { - - String indexName = entry.getKey(); - Object version = entry.getValue(); - - IndexMetaData.Builder indexMetaData = IndexMetaData.builder(indexName); - indexMetaData.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)); - - Map mapping = new HashMap<>(); - Map properties = new HashMap<>(); - for (int i = 0; i < 10; i++) { - properties.put("field" + i, Collections.singletonMap("type", "string")); - } - mapping.put("properties", properties); - - Map meta = new HashMap<>(); - if (version != null && version.equals("NO_VERSION_FIELD") == false) { - meta.put("version", version); - } - mapping.put("_meta", meta); - - indexMetaData.putMapping(new MappingMetaData(ElasticsearchMappings.DOC_TYPE, mapping)); - - metaDataBuilder.put(indexMetaData); - } - MetaData metaData = metaDataBuilder.build(); - - ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); - csBuilder.metaData(metaData); - return csBuilder.build(); - } - private static Job jobWithRules(String jobId) { DetectionRule rule = new DetectionRule.Builder(Collections.singletonList( new RuleCondition(RuleCondition.AppliesTo.TYPICAL, Operator.LT, 100.0) diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index 97168bf93dcb3..ab78d18eed248 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -141,6 +141,7 @@ public void setup() throws Exception { when(metaData.getAliasAndIndexLookup()).thenReturn(aliasOrIndexSortedMap); clusterState = mock(ClusterState.class); when(clusterState.getMetaData()).thenReturn(metaData); + when(clusterState.metaData()).thenReturn(metaData); doAnswer(invocationOnMock -> { @SuppressWarnings("unchecked") diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMappingsUpgradeIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMappingsUpgradeIT.java new file mode 100644 index 0000000000000..465db087af283 --- /dev/null +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMappingsUpgradeIT.java @@ -0,0 +1,126 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.upgrades; + +import org.elasticsearch.Version; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ml.job.config.AnalysisConfig; +import org.elasticsearch.client.ml.job.config.DataDescription; +import org.elasticsearch.client.ml.job.config.Detector; +import org.elasticsearch.client.ml.job.config.Job; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; +import org.elasticsearch.xpack.test.rest.XPackRestTestHelper; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class MlMappingsUpgradeIT extends AbstractUpgradeTestCase { + + private static final String JOB_ID = "ml-mappings-upgrade-job"; + + @Override + protected Collection templatesToWaitFor() { + List templatesToWaitFor = XPackRestTestHelper.ML_POST_V660_TEMPLATES; + + // If upgrading from a version prior to v6.6.0 the set of templates + // to wait for is different + if (CLUSTER_TYPE == ClusterType.OLD) { + if (UPGRADED_FROM_VERSION.before(Version.V_6_6_0)) { + templatesToWaitFor = XPackRestTestHelper.ML_PRE_V660_TEMPLATES; + } + } + + return Stream.concat(templatesToWaitFor.stream(), + super.templatesToWaitFor().stream()).collect(Collectors.toSet()); + } + + /** + * The purpose of this test is to ensure that when a job is open through a rolling upgrade we upgrade the results + * index mappings when it is assigned to an upgraded node even if no other ML endpoint is called after the upgrade + */ + public void testMappingsUpgrade() throws Exception { + + switch (CLUSTER_TYPE) { + case OLD: + createAndOpenTestJob(); + break; + case MIXED: + // We don't know whether the job is on an old or upgraded node, so cannot assert that the mappings have been upgraded + break; + case UPGRADED: + assertUpgradedMappings(); + break; + default: + throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]"); + } + } + + private void createAndOpenTestJob() throws IOException { + + Detector.Builder d = new Detector.Builder("metric", "responsetime"); + d.setByFieldName("airline"); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build())); + analysisConfig.setBucketSpan(TimeValue.timeValueMinutes(10)); + Job.Builder job = new Job.Builder(JOB_ID); + job.setAnalysisConfig(analysisConfig); + job.setDataDescription(new DataDescription.Builder()); + // Use a custom index because other rolling upgrade tests meddle with the shared index + job.setResultsIndexName("mappings-upgrade-test"); + + Request putJob = new Request("PUT", "_xpack/ml/anomaly_detectors/" + JOB_ID); + putJob.setJsonEntity(Strings.toString(job.build())); + Response response = client().performRequest(putJob); + assertEquals(200, response.getStatusLine().getStatusCode()); + + Request openJob = new Request("POST", "_xpack/ml/anomaly_detectors/" + JOB_ID + "/_open"); + response = client().performRequest(openJob); + assertEquals(200, response.getStatusLine().getStatusCode()); + } + + @SuppressWarnings("unchecked") + private void assertUpgradedMappings() throws Exception { + + assertBusy(() -> { + Request getMappings = new Request("GET", AnomalyDetectorsIndex.resultsWriteAlias(JOB_ID) + "/_mappings"); + Response response = client().performRequest(getMappings); + + Map responseLevel = entityAsMap(response); + assertNotNull(responseLevel); + Map indexLevel = null; + // The name of the concrete index underlying the results index alias may or may not have been changed + // by the upgrade process (depending on what other tests are being run and the order they're run in), + // so navigating to the next level of the tree must account for both cases + for (Map.Entry entry : responseLevel.entrySet()) { + if (entry.getKey().startsWith(".ml-anomalies-") && entry.getKey().contains("mappings-upgrade-test")) { + indexLevel = (Map) entry.getValue(); + break; + } + } + assertNotNull(indexLevel); + Map mappingsLevel = (Map) indexLevel.get("mappings"); + assertNotNull(mappingsLevel); + Map typeLevel = (Map) mappingsLevel.get(ElasticsearchMappings.DOC_TYPE); + assertNotNull(typeLevel); + Map metaLevel = (Map) typeLevel.get("_meta"); + assertEquals(Collections.singletonMap("version", Version.CURRENT.toString()), metaLevel); + Map propertiesLevel = (Map) typeLevel.get("properties"); + assertNotNull(propertiesLevel); + // TODO: as the years go by, the field we assert on here should be changed + // to the most recent field we've added that is NOT of type "keyword" + Map fieldLevel = (Map) propertiesLevel.get("multi_bucket_impact"); + assertEquals(Collections.singletonMap("type", "double"), fieldLevel); + }); + } +}