diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsSource.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsSource.java index efe2e6a217e65..6999d1dd4cba7 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsSource.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsSource.java @@ -15,11 +15,13 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Objects; public class DataFrameAnalyticsSource implements ToXContentObject { @@ -45,16 +47,20 @@ public static Builder builder() { (p, c) -> FetchSourceContext.fromXContent(p), _SOURCE, ObjectParser.ValueType.OBJECT_ARRAY_BOOLEAN_OR_STRING); + PARSER.declareObject(Builder::setRuntimeMappings, (p, c) -> p.map(), SearchSourceBuilder.RUNTIME_MAPPINGS_FIELD); } private final String[] index; private final QueryConfig queryConfig; private final FetchSourceContext sourceFiltering; + private final Map runtimeMappings; - private DataFrameAnalyticsSource(String[] index, @Nullable QueryConfig queryConfig, @Nullable FetchSourceContext sourceFiltering) { + private DataFrameAnalyticsSource(String[] index, @Nullable QueryConfig queryConfig, @Nullable FetchSourceContext sourceFiltering, + @Nullable Map runtimeMappings) { this.index = Objects.requireNonNull(index); this.queryConfig = queryConfig; this.sourceFiltering = sourceFiltering; + this.runtimeMappings = runtimeMappings; } public String[] getIndex() { @@ -69,6 +75,10 @@ public FetchSourceContext getSourceFiltering() { return sourceFiltering; } + public Map getRuntimeMappings() { + return runtimeMappings; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -79,6 +89,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (sourceFiltering != null) { builder.field(_SOURCE.getPreferredName(), sourceFiltering); } + if (runtimeMappings != null) { + builder.field(SearchSourceBuilder.RUNTIME_MAPPINGS_FIELD.getPreferredName(), runtimeMappings); + } builder.endObject(); return builder; } @@ -91,12 +104,13 @@ public boolean equals(Object o) { DataFrameAnalyticsSource other = (DataFrameAnalyticsSource) o; return Arrays.equals(index, other.index) && Objects.equals(queryConfig, other.queryConfig) - && Objects.equals(sourceFiltering, other.sourceFiltering); + && Objects.equals(sourceFiltering, other.sourceFiltering) + && Objects.equals(runtimeMappings, other.runtimeMappings); } @Override public int hashCode() { - return Objects.hash(Arrays.asList(index), queryConfig, sourceFiltering); + return Objects.hash(Arrays.asList(index), queryConfig, sourceFiltering, runtimeMappings); } @Override @@ -109,6 +123,7 @@ public static class Builder { private String[] index; private QueryConfig queryConfig; private FetchSourceContext sourceFiltering; + private Map runtimeMappings; private Builder() {} @@ -132,8 +147,13 @@ public Builder setSourceFiltering(FetchSourceContext sourceFiltering) { return this; } + public Builder setRuntimeMappings(Map runtimeMappings) { + this.runtimeMappings = runtimeMappings; + return this; + } + public DataFrameAnalyticsSource build() { - return new DataFrameAnalyticsSource(index, queryConfig, sourceFiltering); + return new DataFrameAnalyticsSource(index, queryConfig, sourceFiltering, runtimeMappings); } } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java index 67f5c8a34ec43..f6ac64c0002b3 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java @@ -2995,13 +2995,16 @@ public void testPutDataFrameAnalytics() throws Exception { QueryConfig queryConfig = new QueryConfig(new MatchAllQueryBuilder()); // end::put-data-frame-analytics-query-config + Map runtimeMappings = Collections.emptyMap(); + // tag::put-data-frame-analytics-source-config DataFrameAnalyticsSource sourceConfig = DataFrameAnalyticsSource.builder() // <1> .setIndex("put-test-source-index") // <2> .setQueryConfig(queryConfig) // <3> + .setRuntimeMappings(runtimeMappings) // <4> .setSourceFiltering(new FetchSourceContext(true, new String[] { "included_field_1", "included_field_2" }, - new String[] { "excluded_field" })) // <4> + new String[] { "excluded_field" })) // <5> .build(); // end::put-data-frame-analytics-source-config diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsSourceTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsSourceTests.java index 3e7dd617fd744..162e80acc9204 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsSourceTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsSourceTests.java @@ -16,6 +16,8 @@ import org.elasticsearch.test.AbstractXContentTestCase; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import java.util.function.Predicate; import static java.util.Collections.emptyList; @@ -31,11 +33,19 @@ public static DataFrameAnalyticsSource randomSourceConfig() { generateRandomStringArray(10, 10, false, false), generateRandomStringArray(10, 10, false, false)); } - + Map runtimeMappings = null; + if (randomBoolean()) { + runtimeMappings = new HashMap<>(); + Map runtimeField = new HashMap<>(); + runtimeField.put("type", "keyword"); + runtimeField.put("script", ""); + runtimeMappings.put(randomAlphaOfLength(10), runtimeField); + } return DataFrameAnalyticsSource.builder() .setIndex(generateRandomStringArray(10, 10, false, false)) .setQueryConfig(randomBoolean() ? null : randomQueryConfig()) .setSourceFiltering(sourceFiltering) + .setRuntimeMappings(runtimeMappings) .build(); } diff --git a/docs/java-rest/high-level/ml/put-data-frame-analytics.asciidoc b/docs/java-rest/high-level/ml/put-data-frame-analytics.asciidoc index 19b21b0d7e91c..30914d539ce7c 100644 --- a/docs/java-rest/high-level/ml/put-data-frame-analytics.asciidoc +++ b/docs/java-rest/high-level/ml/put-data-frame-analytics.asciidoc @@ -55,7 +55,8 @@ include-tagged::{doc-tests-file}[{api}-source-config] <1> Constructing a new DataFrameAnalyticsSource <2> The source index <3> The query from which to gather the data. If query is not set, a `match_all` query is used by default. -<4> Source filtering to select which fields will exist in the destination index. +<4> Runtime mappings that will be added to the destination index mapping. +<5> Source filtering to select which fields will exist in the destination index. ===== QueryConfig diff --git a/docs/reference/ml/df-analytics/apis/put-dfanalytics.asciidoc b/docs/reference/ml/df-analytics/apis/put-dfanalytics.asciidoc index 5cf3b3ed306b8..848b6eb767304 100644 --- a/docs/reference/ml/df-analytics/apis/put-dfanalytics.asciidoc +++ b/docs/reference/ml/df-analytics/apis/put-dfanalytics.asciidoc @@ -523,7 +523,7 @@ setting, an error occurs when you try to create {dfanalytics-jobs} that have `source`:: (object) The configuration of how to source the analysis data. It requires an `index`. -Optionally, `query` and `_source` may be specified. +Optionally, `query`, `runtime_mappings`, and `_source` may be specified. + .Properties of `source` [%collapsible%open] @@ -543,6 +543,10 @@ options that are supported by {es} can be used, as this object is passed verbatim to {es}. By default, this property has the following value: `{"match_all": {}}`. +`runtime_mappings`::: +(Optional, object) Definitions of runtime fields that will become part of the +mapping of the destination index. + `_source`::: (Optional, object) Specify `includes` and/or `excludes` patterns to select which fields will be present in the destination. Fields that are excluded cannot be diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java index 0e07ff1c3fe25..9c74882fa9c96 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java @@ -31,6 +31,7 @@ import java.util.Arrays; import java.util.List; +import java.util.Map; /** * A search action request builder. @@ -592,4 +593,12 @@ public SearchRequestBuilder setPreFilterShardSize(int preFilterShardSize) { this.request.setPreFilterShardSize(preFilterShardSize); return this; } + + /** + * Set runtime mappings to create runtime fields that exist only as part of this particular search. + */ + public SearchRequestBuilder setRuntimeMappings(Map runtimeMappings) { + sourceBuilder().runtimeMappings(runtimeMappings); + return this; + } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java index c0230d13ea7aa..d2fe0b1831b72 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java @@ -36,6 +36,7 @@ import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.MlStrings; import org.elasticsearch.xpack.core.ml.utils.QueryProvider; +import org.elasticsearch.xpack.core.ml.utils.RuntimeMappingsValidator; import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; import org.elasticsearch.xpack.core.ml.utils.XContentObjectTransformer; @@ -825,7 +826,7 @@ public DatafeedConfig build() { } validateScriptFields(); - validateRuntimeMappings(); + RuntimeMappingsValidator.validate(runtimeMappings); setDefaultChunkingConfig(); setDefaultQueryDelay(); @@ -846,28 +847,6 @@ void validateScriptFields() { } } - /** - * Perform a light check that the structure resembles runtime_mappings. - * The full check cannot happen until search - */ - void validateRuntimeMappings() { - for (Map.Entry entry : runtimeMappings.entrySet()) { - // top level objects are fields - String fieldName = entry.getKey(); - if (entry.getValue() instanceof Map) { - @SuppressWarnings("unchecked") - Map propNode = new HashMap<>(((Map) entry.getValue())); - Object typeNode = propNode.get("type"); - if (typeNode == null) { - throw ExceptionsHelper.badRequestException("No type specified for runtime field [" + fieldName + "]"); - } - } else { - throw ExceptionsHelper.badRequestException("Expected map for runtime field [" + fieldName + "] " + - "definition but got a " + fieldName.getClass().getSimpleName()); - } - } - } - private static void checkNoMoreHistogramAggregations(Collection aggregations) { for (AggregationBuilder agg : aggregations) { if (ExtractorUtils.isHistogram(agg)) { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsSource.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsSource.java index 13e06ae2e4df0..b0db6d7083e4b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsSource.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsSource.java @@ -21,15 +21,19 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.QueryProvider; +import org.elasticsearch.xpack.core.ml.utils.RuntimeMappingsValidator; import org.elasticsearch.xpack.core.ml.utils.XContentObjectTransformer; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -45,7 +49,8 @@ public static ConstructingObjectParser createPar ignoreUnknownFields, a -> new DataFrameAnalyticsSource( ((List) a[0]).toArray(new String[0]), (QueryProvider) a[1], - (FetchSourceContext) a[2])); + (FetchSourceContext) a[2], + (Map) a[3])); parser.declareStringArray(ConstructingObjectParser.constructorArg(), INDEX); parser.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> QueryProvider.fromXContent(p, ignoreUnknownFields, Messages.DATA_FRAME_ANALYTICS_BAD_QUERY_FORMAT), QUERY); @@ -53,14 +58,18 @@ public static ConstructingObjectParser createPar (p, c) -> FetchSourceContext.fromXContent(p), _SOURCE, ObjectParser.ValueType.OBJECT_ARRAY_BOOLEAN_OR_STRING); + parser.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.map(), + SearchSourceBuilder.RUNTIME_MAPPINGS_FIELD); return parser; } private final String[] index; private final QueryProvider queryProvider; private final FetchSourceContext sourceFiltering; + private final Map runtimeMappings; - public DataFrameAnalyticsSource(String[] index, @Nullable QueryProvider queryProvider, @Nullable FetchSourceContext sourceFiltering) { + public DataFrameAnalyticsSource(String[] index, @Nullable QueryProvider queryProvider, @Nullable FetchSourceContext sourceFiltering, + @Nullable Map runtimeMappings) { this.index = ExceptionsHelper.requireNonNull(index, INDEX); if (index.length == 0) { throw new IllegalArgumentException("source.index must specify at least one index"); @@ -73,6 +82,8 @@ public DataFrameAnalyticsSource(String[] index, @Nullable QueryProvider queryPro throw new IllegalArgumentException("source._source cannot be disabled"); } this.sourceFiltering = sourceFiltering; + this.runtimeMappings = runtimeMappings == null ? Collections.emptyMap() : Collections.unmodifiableMap(runtimeMappings); + RuntimeMappingsValidator.validate(this.runtimeMappings); } public DataFrameAnalyticsSource(StreamInput in) throws IOException { @@ -83,6 +94,11 @@ public DataFrameAnalyticsSource(StreamInput in) throws IOException { } else { sourceFiltering = null; } + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + runtimeMappings = in.readMap(); + } else { + runtimeMappings = Collections.emptyMap(); + } } public DataFrameAnalyticsSource(DataFrameAnalyticsSource other) { @@ -90,6 +106,7 @@ public DataFrameAnalyticsSource(DataFrameAnalyticsSource other) { this.queryProvider = new QueryProvider(other.queryProvider); this.sourceFiltering = other.sourceFiltering == null ? null : new FetchSourceContext( other.sourceFiltering.fetchSource(), other.sourceFiltering.includes(), other.sourceFiltering.excludes()); + this.runtimeMappings = Collections.unmodifiableMap(new HashMap<>(other.runtimeMappings)); } @Override @@ -99,6 +116,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_7_6_0)) { out.writeOptionalWriteable(sourceFiltering); } + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeMap(runtimeMappings); + } } @Override @@ -109,6 +129,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (sourceFiltering != null) { builder.field(_SOURCE.getPreferredName(), sourceFiltering); } + if (runtimeMappings.isEmpty() == false) { + builder.field(SearchSourceBuilder.RUNTIME_MAPPINGS_FIELD.getPreferredName(), runtimeMappings); + } builder.endObject(); return builder; } @@ -121,12 +144,13 @@ public boolean equals(Object o) { DataFrameAnalyticsSource other = (DataFrameAnalyticsSource) o; return Arrays.equals(index, other.index) && Objects.equals(queryProvider, other.queryProvider) - && Objects.equals(sourceFiltering, other.sourceFiltering); + && Objects.equals(sourceFiltering, other.sourceFiltering) + && Objects.equals(runtimeMappings, other.runtimeMappings); } @Override public int hashCode() { - return Objects.hash(Arrays.asList(index), queryProvider, sourceFiltering); + return Objects.hash(Arrays.asList(index), queryProvider, sourceFiltering, runtimeMappings); } public String[] getIndex() { @@ -189,6 +213,10 @@ Map getQuery() { return queryProvider.getQuery(); } + public Map getRuntimeMappings() { + return runtimeMappings; + } + public boolean isFieldExcluded(String path) { if (sourceFiltering == null) { return false; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/RuntimeMappingsValidator.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/RuntimeMappingsValidator.java new file mode 100644 index 0000000000000..ae7037d15bd02 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/RuntimeMappingsValidator.java @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.ml.utils; + +import java.util.HashMap; +import java.util.Map; + +public final class RuntimeMappingsValidator { + + /** + * Perform a light check that the structure resembles runtime_mappings. + * The full check cannot happen until search + */ + public static void validate(Map runtimeMappings) { + for (Map.Entry entry : runtimeMappings.entrySet()) { + // top level objects are fields + String fieldName = entry.getKey(); + if (entry.getValue() instanceof Map) { + @SuppressWarnings("unchecked") + Map propNode = new HashMap<>(((Map) entry.getValue())); + Object typeNode = propNode.get("type"); + if (typeNode == null) { + throw ExceptionsHelper.badRequestException("No type specified for runtime field [{}]", fieldName); + } + } else { + throw ExceptionsHelper.badRequestException( + "Expected map for runtime field [{}] definition but got a {}", + fieldName, + fieldName.getClass().getSimpleName() + ); + } + } + } +} diff --git a/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json b/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json index f01e1d9a61abc..68a063acbbb10 100644 --- a/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json +++ b/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json @@ -464,6 +464,10 @@ "query" : { "type" : "object", "enabled" : false + }, + "runtime_mappings" : { + "type" : "object", + "enabled" : false } } }, diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PutDataFrameAnalyticsActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PutDataFrameAnalyticsActionRequestTests.java index a3ff6392a1647..cad20194dfc59 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PutDataFrameAnalyticsActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PutDataFrameAnalyticsActionRequestTests.java @@ -81,7 +81,7 @@ protected Request doParseInstance(XContentParser parser) { public void testValidate_GivenRequestWithIncludedAnalyzedFieldThatIsExcludedInSourceFiltering() { DataFrameAnalyticsSource source = new DataFrameAnalyticsSource(new String[] {"index"}, null, - new FetchSourceContext(true, null, new String[] {"excluded"})); + new FetchSourceContext(true, null, new String[] {"excluded"}), null); FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[] {"excluded"}, null); DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() .setId("foo") @@ -99,7 +99,7 @@ public void testValidate_GivenRequestWithIncludedAnalyzedFieldThatIsExcludedInSo public void testValidate_GivenRequestWithIncludedAnalyzedFieldThatIsIncludedInSourceFiltering() { DataFrameAnalyticsSource source = new DataFrameAnalyticsSource(new String[] {"index"}, null, - new FetchSourceContext(true, new String[] {"included"}, null)); + new FetchSourceContext(true, new String[] {"included"}, null), null); FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[] {"included"}, null); DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() .setId("foo") diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigTests.java index 86e5ee3b365c2..b8c9bfaf71dcb 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigTests.java @@ -468,7 +468,7 @@ public void testToXContent_GivenAnalysisWithRandomizeSeedAndVersionIsCurrent() t DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() .setVersion(Version.CURRENT) .setId("test_config") - .setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null)) + .setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null, null)) .setDest(new DataFrameAnalyticsDest("dest_index", null)) .setAnalysis(regression) .build(); @@ -487,7 +487,7 @@ public void testToXContent_GivenAnalysisWithRandomizeSeedAndVersionIsBeforeItWas DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() .setVersion(Version.V_7_5_0) .setId("test_config") - .setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null)) + .setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null, null)) .setDest(new DataFrameAnalyticsDest("dest_index", null)) .setAnalysis(regression) .build(); @@ -509,7 +509,7 @@ public void testExtractJobIdFromDocId() { public void testCtor_GivenMaxNumThreadsIsZero() { ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> new DataFrameAnalyticsConfig.Builder() .setId("test_config") - .setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null)) + .setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null, null)) .setDest(new DataFrameAnalyticsDest("dest_index", null)) .setAnalysis(new Regression("foo")) .setMaxNumThreads(0) @@ -522,7 +522,7 @@ public void testCtor_GivenMaxNumThreadsIsZero() { public void testCtor_GivenMaxNumThreadsIsNegative() { ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> new DataFrameAnalyticsConfig.Builder() .setId("test_config") - .setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null)) + .setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null, null)) .setDest(new DataFrameAnalyticsDest("dest_index", null)) .setAnalysis(new Regression("foo")) .setMaxNumThreads(randomIntBetween(Integer.MIN_VALUE, 0)) diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsSourceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsSourceTests.java index 9a7edca44953f..25b4d17a00bcb 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsSourceTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsSourceTests.java @@ -6,6 +6,7 @@ */ package org.elasticsearch.xpack.core.ml.dataframe; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.Writeable; @@ -22,7 +23,9 @@ import java.io.UncheckedIOException; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -68,12 +71,24 @@ public static DataFrameAnalyticsSource createRandom() { generateRandomStringArray(10, 10, false, false), generateRandomStringArray(10, 10, false, false)); } - return new DataFrameAnalyticsSource(index, queryProvider, sourceFiltering); + Map runtimeMappings = null; + if (randomBoolean()) { + runtimeMappings = new HashMap<>(); + Map runtimeField = new HashMap<>(); + runtimeField.put("type", "keyword"); + runtimeField.put("script", ""); + runtimeMappings.put(randomAlphaOfLength(10), runtimeField); + + } + return new DataFrameAnalyticsSource(index, queryProvider, sourceFiltering, runtimeMappings); } public static DataFrameAnalyticsSource mutateForVersion(DataFrameAnalyticsSource instance, Version version) { if (version.before(Version.V_7_6_0)) { - return new DataFrameAnalyticsSource(instance.getIndex(), instance.getQueryProvider(), null); + return new DataFrameAnalyticsSource(instance.getIndex(), instance.getQueryProvider(), null, null); + } + if (version.before(Version.V_8_0_0)) { + return new DataFrameAnalyticsSource(instance.getIndex(), instance.getQueryProvider(), instance.getSourceFiltering(), null); } return instance; } @@ -85,18 +100,24 @@ protected Writeable.Reader instanceReader() { public void testConstructor_GivenDisabledSource() { IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new DataFrameAnalyticsSource( - new String[] {"index"}, null, new FetchSourceContext(false, null, null))); + new String[] {"index"}, null, new FetchSourceContext(false, null, null), null)); assertThat(e.getMessage(), equalTo("source._source cannot be disabled")); } + public void testConstructor_GivenInvalidRuntimeMappings() { + ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> new DataFrameAnalyticsSource( + new String[] {"index"}, null, null, Collections.singletonMap("typeless", Collections.singletonMap("not a type", "42")))); + assertThat(e.getMessage(), equalTo("No type specified for runtime field [typeless]")); + } + public void testIsFieldExcluded_GivenNoSourceFiltering() { - DataFrameAnalyticsSource source = new DataFrameAnalyticsSource(new String[] { "index" }, null, null); + DataFrameAnalyticsSource source = new DataFrameAnalyticsSource(new String[] { "index" }, null, null, null); assertThat(source.isFieldExcluded(randomAlphaOfLength(10)), is(false)); } public void testIsFieldExcluded_GivenSourceFilteringWithNulls() { DataFrameAnalyticsSource source = new DataFrameAnalyticsSource(new String[] { "index" }, null, - new FetchSourceContext(true, null, null)); + new FetchSourceContext(true, null, null), null); assertThat(source.isFieldExcluded(randomAlphaOfLength(10)), is(false)); } @@ -139,7 +160,7 @@ private static DataFrameAnalyticsSource newSourceWithExcludes(String... excludes private static DataFrameAnalyticsSource newSourceWithIncludesExcludes(List includes, List excludes) { FetchSourceContext sourceFiltering = new FetchSourceContext(true, includes.toArray(new String[0]), excludes.toArray(new String[0])); - return new DataFrameAnalyticsSource(new String[] { "index" } , null, sourceFiltering); + return new DataFrameAnalyticsSource(new String[] { "index" } , null, sourceFiltering, null); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/RuntimeMappingsValidatorTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/RuntimeMappingsValidatorTests.java new file mode 100644 index 0000000000000..308dc6d693e81 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/RuntimeMappingsValidatorTests.java @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.ml.utils; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.ESTestCase; + +import java.util.Collections; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + +public class RuntimeMappingsValidatorTests extends ESTestCase { + + public void testValidate_GivenFieldWhoseValueIsNotMap() { + Map runtimeMappings = Collections.singletonMap("mapless", "not a map"); + + ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> RuntimeMappingsValidator.validate(runtimeMappings)); + + assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); + assertThat(e.getMessage(), equalTo("Expected map for runtime field [mapless] definition but got a String")); + } + + public void testValidate_GivenFieldWithoutType() { + Map fieldMapping = Collections.singletonMap("not a type", "42"); + Map runtimeMappings = Collections.singletonMap("typeless", fieldMapping); + + ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> RuntimeMappingsValidator.validate(runtimeMappings)); + + assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); + assertThat(e.getMessage(), equalTo("No type specified for runtime field [typeless]")); + } + + public void testValidate_GivenValid() { + Map fieldMapping = Collections.singletonMap("type", "keyword"); + Map runtimeMappings = Collections.singletonMap("valid_field", fieldMapping); + + RuntimeMappingsValidator.validate(runtimeMappings); + } +} diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java index a03ff5a8c07a1..61d2696eca114 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java @@ -28,15 +28,19 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchModule; +import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.xpack.core.ml.action.EvaluateDataFrameAction; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetTrainedModelsAction; import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigUpdate; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource; import org.elasticsearch.xpack.core.ml.dataframe.analyses.BoostedTreeParams; import org.elasticsearch.xpack.core.ml.dataframe.analyses.Classification; import org.elasticsearch.xpack.core.ml.dataframe.analyses.MlDataFrameAnalysisNamedXContentProvider; +import org.elasticsearch.xpack.core.ml.dataframe.analyses.Regression; import org.elasticsearch.xpack.core.ml.dataframe.evaluation.classification.Accuracy; import org.elasticsearch.xpack.core.ml.dataframe.evaluation.classification.AucRoc; import org.elasticsearch.xpack.core.ml.dataframe.evaluation.classification.MulticlassConfusionMatrix; @@ -47,7 +51,6 @@ import org.elasticsearch.xpack.core.ml.inference.TrainedModelConfig; import org.elasticsearch.xpack.core.ml.inference.preprocessing.OneHotEncoding; import org.elasticsearch.xpack.core.ml.inference.preprocessing.PreProcessor; -import org.elasticsearch.xpack.core.ml.utils.PhaseProgress; import org.junit.After; import org.junit.Before; @@ -55,6 +58,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -62,12 +66,12 @@ import static java.util.stream.Collectors.toList; import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE; import static org.hamcrest.Matchers.allOf; -import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.emptyString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.in; @@ -810,6 +814,85 @@ public void testTooLowConfiguredMemoryStillStarts() throws Exception { waitUntilAnalyticsIsStopped(jobId); } + public void testWithSearchRuntimeMappings() throws Exception { + initialize("classification_with_search_runtime_mappings"); + indexData(sourceIndex, 300, 50, KEYWORD_FIELD); + + String numericRuntimeField = NUMERICAL_FIELD + "_runtime"; + String dependentVariableRuntimeField = KEYWORD_FIELD + "_runtime"; + + String predictedClassField = dependentVariableRuntimeField + "_prediction"; + + Map numericRuntimeFieldMapping = new HashMap<>(); + numericRuntimeFieldMapping.put("type", "double"); + numericRuntimeFieldMapping.put("script", "emit(doc['" + NUMERICAL_FIELD + "'].value)"); + Map dependentVariableRuntimeFieldMapping = new HashMap<>(); + dependentVariableRuntimeFieldMapping.put("type", "keyword"); + dependentVariableRuntimeFieldMapping.put("script", + "if (doc['" + KEYWORD_FIELD + "'].size() > 0) { emit(doc['" + KEYWORD_FIELD + "'].value); }"); + Map runtimeFields = new HashMap<>(); + runtimeFields.put(numericRuntimeField, numericRuntimeFieldMapping); + runtimeFields.put(dependentVariableRuntimeField, dependentVariableRuntimeFieldMapping); + + DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() + .setId(jobId) + .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null, runtimeFields)) + .setDest(new DataFrameAnalyticsDest(destIndex, null)) + .setAnalyzedFields(new FetchSourceContext(true, new String[] { numericRuntimeField, dependentVariableRuntimeField }, null)) + .setAnalysis(new Classification( + dependentVariableRuntimeField, + BoostedTreeParams.builder().setNumTopFeatureImportanceValues(1).build(), + predictedClassField, + null, + null, + null, + null, + null, + null)) + .build(); + + putAnalytics(config); + + assertIsStopped(jobId); + assertProgressIsZero(jobId); + + startAnalytics(jobId); + waitUntilAnalyticsIsStopped(jobId); + + client().admin().indices().refresh(new RefreshRequest(destIndex)); + SearchResponse destData = client().prepareSearch(destIndex).setTrackTotalHits(true).setSize(1000).get(); + for (SearchHit hit : destData.getHits()) { + Map destDoc = hit.getSourceAsMap(); + Map resultsObject = getFieldValue(destDoc, "ml"); + assertThat(getFieldValue(resultsObject, predictedClassField), is(in(KEYWORD_FIELD_VALUES))); + assertThat(getFieldValue(resultsObject, "is_training"), is(destDoc.containsKey(KEYWORD_FIELD))); + assertTopClasses(resultsObject, 2, dependentVariableRuntimeField, KEYWORD_FIELD_VALUES); + @SuppressWarnings("unchecked") + List> importanceArray = (List>)resultsObject.get("feature_importance"); + assertThat(importanceArray, hasSize(1)); + assertThat(importanceArray.get(0), hasEntry("feature_name", numericRuntimeField)); + } + + assertProgressComplete(jobId); + assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L)); + assertModelStatePersisted(stateDocId()); + assertExactlyOneInferenceModelPersisted(jobId); + assertMlResultsFieldMappings(destIndex, predictedClassField, "keyword"); + assertThatAuditMessagesMatch(jobId, + "Created analytics with analysis type [classification]", + "Estimated memory usage for this analytics to be", + "Starting analytics on node", + "Started analytics", + expectedDestIndexAuditMessage(), + "Started reindexing to destination index [" + destIndex + "]", + "Finished reindexing to destination index [" + destIndex + "]", + "Started loading data", + "Started analyzing", + "Started writing results", + "Finished analysis"); + assertEvaluation(KEYWORD_FIELD, KEYWORD_FIELD_VALUES, "ml." + predictedClassField); + } + private static T getOnlyElement(List list) { assertThat(list, hasSize(1)); return list.get(0); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DataFrameAnalysisCustomFeatureIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DataFrameAnalysisCustomFeatureIT.java index e85dbdb58cc67..c4da2e83758ae 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DataFrameAnalysisCustomFeatureIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DataFrameAnalysisCustomFeatureIT.java @@ -110,7 +110,7 @@ public void testNGramCustomFeature() throws Exception { DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() .setId(jobId) .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, - QueryProvider.fromParsedQuery(QueryBuilders.matchAllQuery()), null)) + QueryProvider.fromParsedQuery(QueryBuilders.matchAllQuery()), null, null)) .setDest(new DataFrameAnalyticsDest(destIndex, null)) .setAnalysis(new Regression(NUMERICAL_FIELD, BoostedTreeParams.builder().setNumTopFeatureImportanceValues(6).build(), diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ExplainDataFrameAnalyticsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ExplainDataFrameAnalyticsIT.java index 289a0d64d7631..3d20104430c8b 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ExplainDataFrameAnalyticsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ExplainDataFrameAnalyticsIT.java @@ -8,6 +8,7 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; @@ -17,25 +18,33 @@ import org.elasticsearch.xpack.core.ml.action.ExplainDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource; import org.elasticsearch.xpack.core.ml.dataframe.analyses.BoostedTreeParams; import org.elasticsearch.xpack.core.ml.dataframe.analyses.Classification; -import org.elasticsearch.xpack.core.ml.dataframe.analyses.Regression; import org.elasticsearch.xpack.core.ml.dataframe.analyses.OutlierDetection; +import org.elasticsearch.xpack.core.ml.dataframe.analyses.Regression; +import org.elasticsearch.xpack.core.ml.dataframe.explain.FieldSelection; import org.elasticsearch.xpack.core.ml.utils.QueryProvider; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; public class ExplainDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTestCase { public void testExplain_GivenMissingSourceIndex() { DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() - .setSource(new DataFrameAnalyticsSource(new String[] {"missing_index"}, null, null)) + .setSource(new DataFrameAnalyticsSource(new String[] {"missing_index"}, null, null, Collections.emptyMap())) .setAnalysis(new OutlierDetection.Builder().build()) .buildForExplain(); @@ -81,7 +90,8 @@ public void testSourceQueryIsApplied() throws IOException { .setId(id) .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, QueryProvider.fromParsedQuery(QueryBuilders.termQuery("filtered_field", "bingo")), - null)) + null, + Collections.emptyMap())) .setAnalysis(new Classification("categorical")) .buildForExplain(); @@ -98,7 +108,8 @@ public void testTrainingPercentageIsApplied() throws IOException { .setId("dfa-training-100-" + sourceIndex) .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, QueryProvider.fromParsedQuery(QueryBuilders.matchAllQuery()), - null)) + null, + Collections.emptyMap())) .setAnalysis(new Regression(RegressionIT.DEPENDENT_VARIABLE_FIELD, BoostedTreeParams.builder().build(), null, @@ -118,7 +129,8 @@ public void testTrainingPercentageIsApplied() throws IOException { .setId("dfa-training-50-" + sourceIndex) .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, QueryProvider.fromParsedQuery(QueryBuilders.matchAllQuery()), - null)) + null, + Collections.emptyMap())) .setAnalysis(new Regression(RegressionIT.DEPENDENT_VARIABLE_FIELD, BoostedTreeParams.builder().build(), null, @@ -147,7 +159,8 @@ public void testSimultaneousExplainSameConfig() throws IOException { .setId("dfa-simultaneous-explain-" + sourceIndex) .setSource(new DataFrameAnalyticsSource(new String[]{sourceIndex}, QueryProvider.fromParsedQuery(QueryBuilders.matchAllQuery()), - null)) + null, + Collections.emptyMap())) .setAnalysis(new Regression(RegressionIT.DEPENDENT_VARIABLE_FIELD, BoostedTreeParams.builder().build(), null, @@ -181,6 +194,57 @@ public void testSimultaneousExplainSameConfig() throws IOException { } } + public void testRuntimeFields() { + String sourceIndex = "test-explain-runtime-fields"; + String mapping = "{\n" + + " \"properties\": {\n" + + " \"mapped_field\": {\n" + + " \"type\": \"double\"\n" + + " }\n" + + " },\n" + + " \"runtime\": {\n" + + " \"mapped_runtime_field\": {\n" + + " \"type\": \"double\"\n," + + " \"script\": \"emit(doc['mapped_field'].value + 10.0)\"\n" + + " }\n" + + " }\n" + + " }"; + client().admin().indices().prepareCreate(sourceIndex) + .setMapping(mapping) + .get(); + BulkRequestBuilder bulkRequestBuilder = client().prepareBulk() + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for (int i = 0; i < 10; i++) { + Object[] source = new Object[] {"mapped_field", i}; + IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source).opType(DocWriteRequest.OpType.CREATE); + bulkRequestBuilder.add(indexRequest); + } + BulkResponse bulkResponse = bulkRequestBuilder.get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + } + + Map configRuntimeField = new HashMap<>(); + configRuntimeField.put("type", "double"); + configRuntimeField.put("script", "emit(doc['mapped_field'].value + 20.0)"); + Map configRuntimeFields = Collections.singletonMap("config_runtime_field", configRuntimeField); + + DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() + .setId(sourceIndex + "-job") + .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null, configRuntimeFields)) + .setDest(new DataFrameAnalyticsDest(sourceIndex + "-results", null)) + .setAnalysis(new OutlierDetection.Builder().build()) + .build(); + + ExplainDataFrameAnalyticsAction.Response explainResponse = explainDataFrame(config); + List fieldSelection = explainResponse.getFieldSelection(); + + assertThat(fieldSelection.size(), equalTo(3)); + assertThat(fieldSelection.stream().map(FieldSelection::getName).collect(Collectors.toList()), + contains("config_runtime_field", "mapped_field", "mapped_runtime_field")); + assertThat(fieldSelection.stream().map(FieldSelection::isIncluded).allMatch(isIncluded -> isIncluded), is(true)); + } + @Override boolean supportsInference() { return false; diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java index fb34d4cc27975..04937e60cd7ab 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java @@ -200,7 +200,8 @@ protected static DataFrameAnalyticsConfig buildAnalytics(String id, String sourc QueryBuilder queryBuilder) throws Exception { return new DataFrameAnalyticsConfig.Builder() .setId(id) - .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, QueryProvider.fromParsedQuery(queryBuilder), null)) + .setSource(new DataFrameAnalyticsSource( + new String[] { sourceIndex }, QueryProvider.fromParsedQuery(queryBuilder), null, Collections.emptyMap())) .setDest(new DataFrameAnalyticsDest(destIndex, resultsField)) .setAnalysis(analysis) .build(); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java index e676d4adb9453..58810a7653b12 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java @@ -42,6 +42,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -50,6 +51,8 @@ import static org.hamcrest.Matchers.emptyString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; @@ -572,7 +575,7 @@ public void testAliasFields() throws Exception { null); DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() .setId(jobId) - .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null)) + .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null, Collections.emptyMap())) .setDest(new DataFrameAnalyticsDest(destIndex, null)) .setAnalysis(regression) .setAnalyzedFields(new FetchSourceContext(true, null, new String[] {"field_1"})) @@ -693,6 +696,83 @@ public void testWithCustomFeatureProcessors() throws Exception { } } + public void testWithSearchRuntimeMappings() throws Exception { + initialize("regression_with_search_runtime_mappings"); + indexData(sourceIndex, 300, 50); + + String numericRuntimeField = NUMERICAL_FEATURE_FIELD + "_runtime"; + String dependentVariableRuntimeField = DEPENDENT_VARIABLE_FIELD + "_runtime"; + + String predictedClassField = dependentVariableRuntimeField + "_prediction"; + + Map numericRuntimeFieldMapping = new HashMap<>(); + numericRuntimeFieldMapping.put("type", "double"); + numericRuntimeFieldMapping.put("script", "emit(doc['" + NUMERICAL_FEATURE_FIELD + "'].value)"); + Map dependentVariableRuntimeFieldMapping = new HashMap<>(); + dependentVariableRuntimeFieldMapping.put("type", "double"); + dependentVariableRuntimeFieldMapping.put("script", + "if (doc['" + DEPENDENT_VARIABLE_FIELD + "'].size() > 0) { emit(doc['" + DEPENDENT_VARIABLE_FIELD + "'].value); }"); + Map runtimeFields = new HashMap<>(); + runtimeFields.put(numericRuntimeField, numericRuntimeFieldMapping); + runtimeFields.put(dependentVariableRuntimeField, dependentVariableRuntimeFieldMapping); + + DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() + .setId(jobId) + .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null, runtimeFields)) + .setDest(new DataFrameAnalyticsDest(destIndex, null)) + .setAnalyzedFields(new FetchSourceContext(true, new String[] { numericRuntimeField, dependentVariableRuntimeField }, null)) + .setAnalysis(new Regression( + dependentVariableRuntimeField, + BoostedTreeParams.builder().setNumTopFeatureImportanceValues(1).build(), + null, + null, + null, + null, + null, + null, + null)) + .build(); + putAnalytics(config); + + assertIsStopped(jobId); + assertProgressIsZero(jobId); + + startAnalytics(jobId); + waitUntilAnalyticsIsStopped(jobId); + + SearchResponse destData = client().prepareSearch(destIndex).setTrackTotalHits(true).setSize(1000).get(); + for (SearchHit hit : destData.getHits()) { + Map destDoc = hit.getSourceAsMap(); + Map resultsObject = getMlResultsObjectFromDestDoc(destDoc); + + assertThat(resultsObject.containsKey(predictedClassField), is(true)); + assertThat(resultsObject.containsKey("is_training"), is(true)); + assertThat(resultsObject.get("is_training"), is(destDoc.containsKey(DEPENDENT_VARIABLE_FIELD))); + @SuppressWarnings("unchecked") + List> importanceArray = (List>)resultsObject.get("feature_importance"); + assertThat(importanceArray, hasSize(1)); + assertThat(importanceArray.get(0), hasEntry("feature_name", numericRuntimeField)); + } + + assertProgressComplete(jobId); + assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L)); + assertModelStatePersisted(stateDocId()); + assertExactlyOneInferenceModelPersisted(jobId); + assertMlResultsFieldMappings(destIndex, predictedClassField, "double"); + assertThatAuditMessagesMatch(jobId, + "Created analytics with analysis type [regression]", + "Estimated memory usage for this analytics to be", + "Starting analytics on node", + "Started analytics", + "Creating destination index [" + destIndex + "]", + "Started reindexing to destination index [" + destIndex + "]", + "Finished reindexing to destination index [" + destIndex + "]", + "Started loading data", + "Started analyzing", + "Started writing results", + "Finished analysis"); + } + private void initialize(String jobId) { this.jobId = jobId; this.sourceIndex = jobId + "_source_index"; diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java index f2d4d2ebe526d..8ba7fcb6c34b1 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java @@ -10,8 +10,6 @@ import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest; -import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchResponse; @@ -29,15 +27,15 @@ import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; import org.elasticsearch.xpack.core.ml.dataframe.analyses.OutlierDetection; -import org.elasticsearch.xpack.core.ml.utils.PhaseProgress; import org.junit.After; import org.junit.Before; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import static org.hamcrest.Matchers.allOf; -import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.emptyString; import static org.hamcrest.Matchers.equalTo; @@ -391,7 +389,7 @@ public void testOutlierDetectionWithMultipleSourceIndices() throws Exception { String id = "test_outlier_detection_with_multiple_source_indices"; DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() .setId(id) - .setSource(new DataFrameAnalyticsSource(sourceIndex, null, null)) + .setSource(new DataFrameAnalyticsSource(sourceIndex, null, null, null)) .setDest(new DataFrameAnalyticsDest(destIndex, null)) .setAnalysis(new OutlierDetection.Builder().build()) .build(); @@ -513,7 +511,7 @@ public void testModelMemoryLimitLowerThanEstimatedMemoryUsage() throws Exception ByteSizeValue modelMemoryLimit = ByteSizeValue.ofMb(1); DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() .setId(id) - .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null)) + .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null, null)) .setDest(new DataFrameAnalyticsDest(sourceIndex + "-results", null)) .setAnalysis(new OutlierDetection.Builder().build()) .setModelMemoryLimit(modelMemoryLimit) @@ -550,7 +548,7 @@ public void testLazyAssignmentWithModelMemoryLimitTooHighForAssignment() throws ByteSizeValue modelMemoryLimit = ByteSizeValue.ofTb(1); DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() .setId(id) - .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null)) + .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null, null)) .setDest(new DataFrameAnalyticsDest(sourceIndex + "-results", null)) .setAnalysis(new OutlierDetection.Builder().build()) .setModelMemoryLimit(modelMemoryLimit) @@ -834,6 +832,112 @@ public void testOutlierDetection_GivenIndexWithRuntimeFields() throws Exception "Finished analysis"); } + public void testOutlierDetection_GivenSearchRuntimeMappings() throws Exception { + String sourceIndex = "test-outlier-detection-index-with-search-runtime-fields"; + + String mappings = "{\"enabled\": false}"; + + client().admin().indices().prepareCreate(sourceIndex) + .setMapping(mappings) + .get(); + + BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); + bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + + for (int i = 0; i < 5; i++) { + IndexRequest indexRequest = new IndexRequest(sourceIndex); + + // We insert one odd value out of 5 for one feature + String docId = i == 0 ? "outlier" : "normal" + i; + indexRequest.id(docId); + indexRequest.source("numeric", i == 0 ? 100.0 : 1.0); + bulkRequestBuilder.add(indexRequest); + } + BulkResponse bulkResponse = bulkRequestBuilder.get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + } + + String id = "test_outlier_detection_index_with_search_runtime_fields"; + + Map runtimeMappings = new HashMap<>(); + Map numericFieldRuntimeMapping = new HashMap<>(); + numericFieldRuntimeMapping.put("type", "double"); + numericFieldRuntimeMapping.put("script", "emit(params._source.numeric)"); + runtimeMappings.put("runtime_numeric", numericFieldRuntimeMapping); + + DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() + .setId(id) + .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null, runtimeMappings)) + .setDest(new DataFrameAnalyticsDest(sourceIndex + "-results", null)) + .setAnalysis(new OutlierDetection.Builder().build()) + .build(); + putAnalytics(config); + + assertIsStopped(id); + assertProgressIsZero(id); + + startAnalytics(id); + waitUntilAnalyticsIsStopped(id); + GetDataFrameAnalyticsStatsAction.Response.Stats stats = getAnalyticsStats(id); + assertThat(stats.getDataCounts().getJobId(), equalTo(id)); + assertThat(stats.getDataCounts().getTrainingDocsCount(), equalTo(5L)); + assertThat(stats.getDataCounts().getTestDocsCount(), equalTo(0L)); + assertThat(stats.getDataCounts().getSkippedDocsCount(), equalTo(0L)); + + SearchResponse sourceData = client().prepareSearch(sourceIndex).get(); + double scoreOfOutlier = 0.0; + double scoreOfNonOutlier = -1.0; + for (SearchHit hit : sourceData.getHits()) { + GetResponse destDocGetResponse = client().prepareGet().setIndex(config.getDest().getIndex()).setId(hit.getId()).get(); + assertThat(destDocGetResponse.isExists(), is(true)); + Map sourceDoc = hit.getSourceAsMap(); + Map destDoc = destDocGetResponse.getSource(); + for (String field : sourceDoc.keySet()) { + assertThat(destDoc.containsKey(field), is(true)); + assertThat(destDoc.get(field), equalTo(sourceDoc.get(field))); + } + assertThat(destDoc.containsKey("ml"), is(true)); + + @SuppressWarnings("unchecked") + Map resultsObject = (Map) destDoc.get("ml"); + + assertThat(resultsObject.containsKey("outlier_score"), is(true)); + double outlierScore = (double) resultsObject.get("outlier_score"); + assertThat(outlierScore, allOf(greaterThanOrEqualTo(0.0), lessThanOrEqualTo(1.0))); + if (hit.getId().equals("outlier")) { + scoreOfOutlier = outlierScore; + + @SuppressWarnings("unchecked") + List> featureInfluence = (List>) resultsObject.get("feature_influence"); + assertThat(featureInfluence.size(), equalTo(1)); + assertThat(featureInfluence.get(0).get("feature_name"), equalTo("runtime_numeric")); + } else { + if (scoreOfNonOutlier < 0) { + scoreOfNonOutlier = outlierScore; + } else { + assertThat(outlierScore, equalTo(scoreOfNonOutlier)); + } + } + } + assertThat(scoreOfOutlier, is(greaterThan(scoreOfNonOutlier))); + + assertProgressComplete(id); + assertThat(searchStoredProgress(id).getHits().getTotalHits().value, equalTo(1L)); + assertThatAuditMessagesMatch(id, + "Created analytics with analysis type [outlier_detection]", + "Estimated memory usage for this analytics to be", + "Starting analytics on node", + "Started analytics", + "Creating destination index [" + sourceIndex + "-results]", + "Started reindexing to destination index [" + sourceIndex + "-results]", + "Finished reindexing to destination index [" + sourceIndex + "-results]", + "Started loading data", + "Started analyzing", + "Started writing results", + "Finished analysis"); + } + @Override boolean supportsInference() { return false; diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/ChunkedTrainedModelPersisterIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/ChunkedTrainedModelPersisterIT.java index cb68f15e750bc..e9876ac04ddfd 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/ChunkedTrainedModelPersisterIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/ChunkedTrainedModelPersisterIT.java @@ -71,7 +71,7 @@ public void testStoreModelViaChunkedPersister() throws IOException { String modelId = "stored-chunked-model"; DataFrameAnalyticsConfig analyticsConfig = new DataFrameAnalyticsConfig.Builder() .setId(modelId) - .setSource(new DataFrameAnalyticsSource(new String[] {"my_source"}, null, null)) + .setSource(new DataFrameAnalyticsSource(new String[] {"my_source"}, null, null, null)) .setDest(new DataFrameAnalyticsDest("my_dest", null)) .setAnalysis(new Regression("foo")) .build(); diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/UnusedStatsRemoverIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/UnusedStatsRemoverIT.java index 95034146eba49..0b2bbaf5eaf03 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/UnusedStatsRemoverIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/UnusedStatsRemoverIT.java @@ -64,7 +64,7 @@ public void testRemoveUnusedStats() throws Exception { PutDataFrameAnalyticsAction.Request request = new PutDataFrameAnalyticsAction.Request(new DataFrameAnalyticsConfig.Builder() .setId("analytics-with-stats") .setModelMemoryLimit(ByteSizeValue.ofGb(1)) - .setSource(new DataFrameAnalyticsSource(new String[]{"foo"}, null, null)) + .setSource(new DataFrameAnalyticsSource(new String[]{"foo"}, null, null, null)) .setDest(new DataFrameAnalyticsDest("bar", null)) .setAnalysis(new Regression("prediction")) .build()); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DestinationIndex.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DestinationIndex.java index e66b1a4ce6a13..5cb2a709924bb 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DestinationIndex.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DestinationIndex.java @@ -72,6 +72,7 @@ public final class DestinationIndex { private static final String PROPERTIES = "properties"; private static final String META = "_meta"; + private static final String RUNTIME = "runtime"; private static final String DFA_CREATOR = "data-frame-analytics"; @@ -124,23 +125,7 @@ private static void prepareCreateIndexRequest(Client client, Clock clock, DataFr ActionListener mappingsListener = ActionListener.wrap( mappings -> { mappingsHolder.set(mappings); - - List requiredFields = config.getAnalysis().getRequiredFields(); - if (requiredFields.isEmpty()) { - fieldCapabilitiesListener.onResponse(null); - return; - } - FieldCapabilitiesRequest fieldCapabilitiesRequest = - new FieldCapabilitiesRequest() - .indices(config.getSource().getIndex()) - .fields(requiredFields.stream().map(RequiredField::getName).toArray(String[]::new)); - ClientHelper.executeWithHeadersAsync( - config.getHeaders(), - ML_ORIGIN, - client, - FieldCapabilitiesAction.INSTANCE, - fieldCapabilitiesRequest, - fieldCapabilitiesListener); + getFieldCapsForRequiredFields(client, config, fieldCapabilitiesListener); }, listener::onFailure ); @@ -167,6 +152,27 @@ private static void prepareCreateIndexRequest(Client client, Clock clock, DataFr config.getHeaders(), ML_ORIGIN, client, GetSettingsAction.INSTANCE, getSettingsRequest, getSettingsResponseListener); } + private static void getFieldCapsForRequiredFields(Client client, DataFrameAnalyticsConfig config, + ActionListener listener) { + List requiredFields = config.getAnalysis().getRequiredFields(); + if (requiredFields.isEmpty()) { + listener.onResponse(null); + return; + } + FieldCapabilitiesRequest fieldCapabilitiesRequest = + new FieldCapabilitiesRequest() + .indices(config.getSource().getIndex()) + .fields(requiredFields.stream().map(RequiredField::getName).toArray(String[]::new)) + .runtimeFields(config.getSource().getRuntimeMappings()); + ClientHelper.executeWithHeadersAsync( + config.getHeaders(), + ML_ORIGIN, + client, + FieldCapabilitiesAction.INSTANCE, + fieldCapabilitiesRequest, + listener); + } + private static CreateIndexRequest createIndexRequest(Clock clock, DataFrameAnalyticsConfig config, Settings settings, @@ -179,6 +185,8 @@ private static CreateIndexRequest createIndexRequest(Clock clock, properties.putAll(createAdditionalMappings(config, fieldCapabilitiesResponse)); Map metadata = getOrPutDefault(mappingsAsMap, META, HashMap::new); metadata.putAll(createMetadata(config.getId(), clock, Version.CURRENT)); + Map runtimeMappings = getOrPutDefault(mappingsAsMap, RUNTIME, HashMap::new); + runtimeMappings.putAll(config.getSource().getRuntimeMappings()); return new CreateIndexRequest(destinationIndex, settings).mapping(mappingsAsMap); } @@ -258,8 +266,12 @@ public static void updateMappingsToDestIndex(Client client, ActionListener fieldCapabilitiesListener = ActionListener.wrap( fieldCapabilitiesResponse -> { + Map addedMappings = new HashMap<>(); + // Determine mappings to be added to the destination index - Map addedMappings = Map.of(PROPERTIES, createAdditionalMappings(config, fieldCapabilitiesResponse)); + addedMappings.put(PROPERTIES, createAdditionalMappings(config, fieldCapabilitiesResponse)); + // Also add runtime mappings + addedMappings.put(RUNTIME, config.getSource().getRuntimeMappings()); // Add the mappings to the destination index PutMappingRequest putMappingRequest = @@ -271,22 +283,7 @@ public static void updateMappingsToDestIndex(Client client, listener::onFailure ); - List requiredFields = config.getAnalysis().getRequiredFields(); - if (requiredFields.isEmpty()) { - fieldCapabilitiesListener.onResponse(null); - return; - } - FieldCapabilitiesRequest fieldCapabilitiesRequest = - new FieldCapabilitiesRequest() - .indices(config.getSource().getIndex()) - .fields(requiredFields.stream().map(RequiredField::getName).toArray(String[]::new)); - ClientHelper.executeWithHeadersAsync( - config.getHeaders(), - ML_ORIGIN, - client, - FieldCapabilitiesAction.INSTANCE, - fieldCapabilitiesRequest, - fieldCapabilitiesListener); + getFieldCapsForRequiredFields(client, config, fieldCapabilitiesListener); } private static void checkResultsFieldIsNotPresentInProperties(DataFrameAnalyticsConfig config, Map properties) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java index 194d37f5181d0..998b37de15f42 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java @@ -176,6 +176,8 @@ private SearchRequestBuilder buildSearchRequest() { searchRequestBuilder.addDocValueField(docValueField.getSearchField(), docValueField.getDocValueFormat()); } + searchRequestBuilder.setRuntimeMappings(context.runtimeMappings); + return searchRequestBuilder; } @@ -341,7 +343,8 @@ private SearchRequestBuilder buildDataSummarySearchRequestBuilder() { .setIndices(context.indices) .setSize(0) .setQuery(summaryQuery) - .setTrackTotalHits(true); + .setTrackTotalHits(true) + .setRuntimeMappings(context.runtimeMappings); } private QueryBuilder allExtractedFieldsExistQuery() { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorContext.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorContext.java index 496c87d7d9c1d..57fe032c107f4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorContext.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorContext.java @@ -26,9 +26,14 @@ public class DataFrameDataExtractorContext { final boolean supportsRowsWithMissingValues; final TrainTestSplitterFactory trainTestSplitterFactory; + // Runtime mappings are necessary while we are still querying the source indices. + // They should be empty when we're querying the destination index as the runtime + // fields should be mapped in the index. + final Map runtimeMappings; + DataFrameDataExtractorContext(String jobId, ExtractedFields extractedFields, List indices, QueryBuilder query, int scrollSize, Map headers, boolean includeSource, boolean supportsRowsWithMissingValues, - TrainTestSplitterFactory trainTestSplitterFactory) { + TrainTestSplitterFactory trainTestSplitterFactory, Map runtimeMappings) { this.jobId = Objects.requireNonNull(jobId); this.extractedFields = Objects.requireNonNull(extractedFields); this.indices = indices.toArray(new String[indices.size()]); @@ -38,5 +43,6 @@ public class DataFrameDataExtractorContext { this.includeSource = includeSource; this.supportsRowsWithMissingValues = supportsRowsWithMissingValues; this.trainTestSplitterFactory = Objects.requireNonNull(trainTestSplitterFactory); + this.runtimeMappings = Objects.requireNonNull(runtimeMappings); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java index 2213d2d607071..9786e04655e3f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java @@ -35,11 +35,12 @@ public class DataFrameDataExtractorFactory { private final Map headers; private final boolean supportsRowsWithMissingValues; private final TrainTestSplitterFactory trainTestSplitterFactory; + private final Map runtimeMappings; private DataFrameDataExtractorFactory(Client client, String analyticsId, List indices, QueryBuilder sourceQuery, ExtractedFields extractedFields, List requiredFields, Map headers, - boolean supportsRowsWithMissingValues, - TrainTestSplitterFactory trainTestSplitterFactory) { + boolean supportsRowsWithMissingValues, TrainTestSplitterFactory trainTestSplitterFactory, + Map runtimeMappings) { this.client = Objects.requireNonNull(client); this.analyticsId = Objects.requireNonNull(analyticsId); this.indices = Objects.requireNonNull(indices); @@ -49,6 +50,7 @@ private DataFrameDataExtractorFactory(Client client, String analyticsId, List fieldCapsPerType = fieldCapabilitiesResponse.getField(constraint.getField()); if (fieldCapsPerType == null) { @@ -171,6 +174,7 @@ private void getFieldCaps(String[] index, DataFrameAnalyticsConfig config, Actio fieldCapabilitiesRequest.indices(index); fieldCapabilitiesRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); fieldCapabilitiesRequest.fields("*"); + fieldCapabilitiesRequest.runtimeFields(config.getSource().getRuntimeMappings()); LOGGER.debug(() -> new ParameterizedMessage( "[{}] Requesting field caps for index {}", config.getId(), Arrays.toString(index))); ClientHelper.executeWithHeaders(config.getHeaders(), ML_ORIGIN, client, () -> { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DestinationIndexTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DestinationIndexTests.java index 98549256a97a8..4e284306840be 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DestinationIndexTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DestinationIndexTests.java @@ -463,7 +463,7 @@ private static Answer callListenerOnResponse(Response respo private static DataFrameAnalyticsConfig createConfig(DataFrameAnalysis analysis) { return new DataFrameAnalyticsConfig.Builder() .setId(ANALYTICS_ID) - .setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null, null)) + .setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null, null, null)) .setDest(new DataFrameAnalyticsDest(DEST_INDEX, null)) .setAnalysis(analysis) .build(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/MappingsMergerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/MappingsMergerTests.java index d3450e8dee0ff..5fec5cce2bbe0 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/MappingsMergerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/MappingsMergerTests.java @@ -251,11 +251,11 @@ public void testMergeMappings_GivenSourceFiltering() { } private static DataFrameAnalyticsSource newSource() { - return new DataFrameAnalyticsSource(new String[] {"index"}, null, null); + return new DataFrameAnalyticsSource(new String[] {"index"}, null, null, null); } private static DataFrameAnalyticsSource newSourceWithExcludes(String... excludes) { return new DataFrameAnalyticsSource(new String[] {"index"}, null, - new FetchSourceContext(true, null, excludes)); + new FetchSourceContext(true, null, excludes), null); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorTests.java index fdba987d715c5..da86a89db7ed8 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorTests.java @@ -565,7 +565,7 @@ public void testExtractionWithProcessedFieldThrows() { private TestExtractor createExtractor(boolean includeSource, boolean supportsRowsWithMissingValues) { DataFrameDataExtractorContext context = new DataFrameDataExtractorContext(JOB_ID, extractedFields, indices, query, scrollSize, - headers, includeSource, supportsRowsWithMissingValues, trainTestSplitterFactory); + headers, includeSource, supportsRowsWithMissingValues, trainTestSplitterFactory, Collections.emptyMap()); return new TestExtractor(client, context); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorTests.java index 11d7acb386443..a8a52673f1627 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorTests.java @@ -1137,7 +1137,7 @@ public void testDetect_withFeatureProcessors() { private DataFrameAnalyticsConfig buildOutlierDetectionConfig() { return new DataFrameAnalyticsConfig.Builder() .setId("foo") - .setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null, sourceFiltering)) + .setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null, sourceFiltering, null)) .setDest(new DataFrameAnalyticsDest(DEST_INDEX, RESULTS_FIELD)) .setAnalyzedFields(analyzedFields) .setAnalysis(new OutlierDetection.Builder().build()) @@ -1151,7 +1151,7 @@ private DataFrameAnalyticsConfig buildRegressionConfig(String dependentVariable) private DataFrameAnalyticsConfig buildClassificationConfig(String dependentVariable) { return new DataFrameAnalyticsConfig.Builder() .setId("foo") - .setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null, sourceFiltering)) + .setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null, sourceFiltering, null)) .setDest(new DataFrameAnalyticsDest(DEST_INDEX, RESULTS_FIELD)) .setAnalysis(new Classification(dependentVariable)) .build(); @@ -1160,7 +1160,7 @@ private DataFrameAnalyticsConfig buildClassificationConfig(String dependentVaria private DataFrameAnalyticsConfig buildRegressionConfig(String dependentVariable, List featureprocessors) { return new DataFrameAnalyticsConfig.Builder() .setId("foo") - .setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null, sourceFiltering)) + .setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null, sourceFiltering, null)) .setDest(new DataFrameAnalyticsDest(DEST_INDEX, RESULTS_FIELD)) .setAnalyzedFields(analyzedFields) .setAnalysis(new Regression(dependentVariable, diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/inference/InferenceRunnerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/inference/InferenceRunnerTests.java index 3e0e1bf931488..deb7b6a667254 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/inference/InferenceRunnerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/inference/InferenceRunnerTests.java @@ -71,7 +71,7 @@ public void setupTests() { config = new DataFrameAnalyticsConfig.Builder() .setId("test") .setAnalysis(RegressionTests.createRandom()) - .setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null)) + .setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null, null)) .setDest(new DataFrameAnalyticsDest("dest_index", "test_results_field")) .build(); progressTracker = ProgressTracker.fromZeroes(config.getAnalysis().getProgressPhases(), config.getAnalysis().supportsInference()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessorTests.java index 16a89c2192e4d..9d87ed78ed570 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessorTests.java @@ -82,7 +82,7 @@ public void setUpMocks() { analyticsConfig = new DataFrameAnalyticsConfig.Builder() .setId(JOB_ID) .setDescription(JOB_DESCRIPTION) - .setSource(new DataFrameAnalyticsSource(new String[] {"my_source"}, null, null)) + .setSource(new DataFrameAnalyticsSource(new String[] {"my_source"}, null, null, null)) .setDest(new DataFrameAnalyticsDest("my_dest", null)) .setAnalysis(new Regression("foo")) .build(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/ChunkedTrainedModelPersisterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/ChunkedTrainedModelPersisterTests.java index dd190dc062728..4c87429026df9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/ChunkedTrainedModelPersisterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/ChunkedTrainedModelPersisterTests.java @@ -77,7 +77,7 @@ public void testPersistAllDocs() { DataFrameAnalyticsConfig analyticsConfig = new DataFrameAnalyticsConfig.Builder() .setId(JOB_ID) .setDescription(JOB_DESCRIPTION) - .setSource(new DataFrameAnalyticsSource(new String[] {"my_source"}, null, null)) + .setSource(new DataFrameAnalyticsSource(new String[] {"my_source"}, null, null, null)) .setDest(new DataFrameAnalyticsDest("my_dest", null)) .setAnalysis(randomBoolean() ? new Regression("foo") : new Classification("foo")) .build(); diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml index 51bd2da2c841c..da0df533886fa 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml @@ -2100,3 +2100,39 @@ setup: }} - is_false: data_frame_analytics.0.create_time - is_false: data_frame_analytics.0.version + +--- +"Test put with runtime mappings": + + - do: + ml.put_data_frame_analytics: + id: "with-runtime-mappings" + body: > + { + "source": { + "index": "index-source", + "runtime_mappings": { + "runtime_field": { + "type": "double", + "script": "" + } + } + }, + "dest": { + "index": "index-dest" + }, + "analysis": { + "outlier_detection": { + } + } + } + - match: { id: "with-runtime-mappings" } + - match: { source.index: ["index-source"] } + - match: { source.runtime_mappings: { + "runtime_field": { + "type": "double", + "script": "" + } + } + } + - match: { dest.index: "index-dest" }