Skip to content

Commit

Permalink
[ML] Update ML results mappings on process start
Browse files Browse the repository at this point in the history
This change moves the update to the results index mappings
from the open job action to the code that starts the
autodetect process.

When a rolling upgrade is performed we need to update the
mappings for already-open jobs that are reassigned from an
old version node to a new version node, but the open job
action is not called in this case.

Closes elastic#37607
  • Loading branch information
droberts195 committed Jan 22, 2019
1 parent 74d1cfb commit 9adcb63
Show file tree
Hide file tree
Showing 7 changed files with 335 additions and 212 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,23 @@
*/
package org.elasticsearch.xpack.core.ml.job.persistence;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasOrIndex;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.Index;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DelayedDataCheckConfig;
Expand Down Expand Up @@ -38,10 +53,16 @@
import org.elasticsearch.xpack.core.ml.notifications.AuditMessage;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

/**
* Static methods to create Elasticsearch index mappings for the autodetect
Expand Down Expand Up @@ -964,4 +985,95 @@ public static XContentBuilder auditMessageMapping() throws IOException {
.endObject()
.endObject();
}

static String[] mappingRequiresUpdate(ClusterState state, String[] concreteIndices, Version minVersion,
Logger logger) throws IOException {
List<String> indicesToUpdate = new ArrayList<>();

ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> currentMapping = state.metaData().findMappings(concreteIndices,
new String[] {DOC_TYPE}, MapperPlugin.NOOP_FIELD_FILTER);

for (String index : concreteIndices) {
ImmutableOpenMap<String, MappingMetaData> innerMap = currentMapping.get(index);
if (innerMap != null) {
MappingMetaData metaData = innerMap.get(DOC_TYPE);
try {
@SuppressWarnings("unchecked")
Map<String, Object> meta = (Map<String, Object>) metaData.sourceAsMap().get("_meta");
if (meta != null) {
String versionString = (String) meta.get("version");
if (versionString == null) {
logger.info("Version of mappings for [{}] not found, recreating", index);
indicesToUpdate.add(index);
continue;
}

Version mappingVersion = Version.fromString(versionString);

if (mappingVersion.onOrAfter(minVersion)) {
continue;
} else {
logger.info("Mappings for [{}] are outdated [{}], updating it[{}].", index, mappingVersion, Version.CURRENT);
indicesToUpdate.add(index);
continue;
}
} else {
logger.info("Version of mappings for [{}] not found, recreating", index);
indicesToUpdate.add(index);
continue;
}
} catch (Exception e) {
logger.error(new ParameterizedMessage("Failed to retrieve mapping version for [{}], recreating", index), e);
indicesToUpdate.add(index);
continue;
}
} else {
logger.info("No mappings found for [{}], recreating", index);
indicesToUpdate.add(index);
}
}
return indicesToUpdate.toArray(new String[indicesToUpdate.size()]);
}

public static void addDocMappingIfMissing(String alias, CheckedSupplier<XContentBuilder, IOException> mappingSupplier,
Client client, Logger logger, ClusterState state, ActionListener<Boolean> listener) {
AliasOrIndex aliasOrIndex = state.metaData().getAliasAndIndexLookup().get(alias);
if (aliasOrIndex == null) {
// The index has never been created yet
listener.onResponse(true);
return;
}
String[] concreteIndices = aliasOrIndex.getIndices().stream().map(IndexMetaData::getIndex).map(Index::getName)
.toArray(String[]::new);

String[] indicesThatRequireAnUpdate;
try {
indicesThatRequireAnUpdate = mappingRequiresUpdate(state, concreteIndices, Version.CURRENT, logger);
} catch (IOException e) {
listener.onFailure(e);
return;
}

if (indicesThatRequireAnUpdate.length > 0) {
try (XContentBuilder mapping = mappingSupplier.get()) {
PutMappingRequest putMappingRequest = new PutMappingRequest(indicesThatRequireAnUpdate);
putMappingRequest.type(DOC_TYPE);
putMappingRequest.source(mapping);
executeAsyncWithOrigin(client, ML_ORIGIN, PutMappingAction.INSTANCE, putMappingRequest,
ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
listener.onResponse(true);
} else {
listener.onFailure(new ElasticsearchException("Attempt to put missing mapping in indices "
+ Arrays.toString(indicesThatRequireAnUpdate) + " was not acknowledged"));
}
}, listener::onFailure));
} catch (IOException e) {
listener.onFailure(e);
}
} else {
logger.trace("Mappings are up to date.");
listener.onResponse(true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,18 @@
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig;
Expand All @@ -30,6 +38,8 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -128,6 +138,97 @@ public void testTermFieldMapping() throws IOException {
assertNull(instanceMapping);
}


public void testMappingRequiresUpdateNoMapping() throws IOException {
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"));
ClusterState cs = csBuilder.build();
String[] indices = new String[] { "no_index" };

assertArrayEquals(new String[] { "no_index" }, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger));
}

public void testMappingRequiresUpdateNullMapping() throws IOException {
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("null_mapping", null));
String[] indices = new String[] { "null_index" };
assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger));
}

public void testMappingRequiresUpdateNoVersion() throws IOException {
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("no_version_field", "NO_VERSION_FIELD"));
String[] indices = new String[] { "no_version_field" };
assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger));
}

public void testMappingRequiresUpdateRecentMappingVersion() throws IOException {
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_current", Version.CURRENT.toString()));
String[] indices = new String[] { "version_current" };
assertArrayEquals(new String[] {}, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger));
}

public void testMappingRequiresUpdateMaliciousMappingVersion() throws IOException {
ClusterState cs = getClusterStateWithMappingsWithMetaData(
Collections.singletonMap("version_current", Collections.singletonMap("nested", "1.0")));
String[] indices = new String[] { "version_nested" };
assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger));
}

public void testMappingRequiresUpdateBogusMappingVersion() throws IOException {
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_bogus", "0.0"));
String[] indices = new String[] { "version_bogus" };
assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger));
}

public void testMappingRequiresUpdateNewerMappingVersion() throws IOException {
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_newer", Version.CURRENT));
String[] indices = new String[] { "version_newer" };
assertArrayEquals(new String[] {}, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, VersionUtils.getPreviousVersion(),
logger));
}

public void testMappingRequiresUpdateNewerMappingVersionMinor() throws IOException {
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_newer_minor", Version.CURRENT));
String[] indices = new String[] { "version_newer_minor" };
assertArrayEquals(new String[] {},
ElasticsearchMappings.mappingRequiresUpdate(cs, indices, VersionUtils.getPreviousMinorVersion(), logger));
}


private ClusterState getClusterStateWithMappingsWithMetaData(Map<String, Object> namesAndVersions) throws IOException {
MetaData.Builder metaDataBuilder = MetaData.builder();

for (Map.Entry<String, Object> entry : namesAndVersions.entrySet()) {

String indexName = entry.getKey();
Object version = entry.getValue();

IndexMetaData.Builder indexMetaData = IndexMetaData.builder(indexName);
indexMetaData.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0));

Map<String, Object> mapping = new HashMap<>();
Map<String, Object> properties = new HashMap<>();
for (int i = 0; i < 10; i++) {
properties.put("field" + i, Collections.singletonMap("type", "string"));
}
mapping.put("properties", properties);

Map<String, Object> meta = new HashMap<>();
if (version != null && version.equals("NO_VERSION_FIELD") == false) {
meta.put("version", version);
}
mapping.put("_meta", meta);

indexMetaData.putMapping(new MappingMetaData(ElasticsearchMappings.DOC_TYPE, mapping));

metaDataBuilder.put(indexMetaData);
}
MetaData metaData = metaDataBuilder.build();

ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"));
csBuilder.metaData(metaData);
return csBuilder.build();
}

private Set<String> collectResultsDocFieldNames() throws IOException {
// Only the mappings for the results index should be added below. Do NOT add mappings for other indexes here.
return collectFieldNames(ElasticsearchMappings.resultsMapping());
Expand Down
Loading

0 comments on commit 9adcb63

Please sign in to comment.