From 7fb98c0d3c098b31dd8cb104a01ef75f8ef3bbbd Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Fri, 19 Feb 2021 16:29:19 +0200 Subject: [PATCH] [ML] Add runtime mappings to data frame analytics source config (#69183) Users can now specify runtime mappings as part of the source config of a data frame analytics job. Those runtime mappings become part of the mapping of the destination index. This ensures the fields are accessible in the destination index even if the relevant data frame analytics job gets deleted. Closes #65056 --- .../dataframe/DataFrameAnalyticsSource.java | 28 ++++- .../MlClientDocumentationIT.java | 5 +- .../DataFrameAnalyticsSourceTests.java | 12 +- .../ml/put-data-frame-analytics.asciidoc | 3 +- .../apis/put-dfanalytics.asciidoc | 6 +- .../action/search/SearchRequestBuilder.java | 9 ++ .../core/ml/datafeed/DatafeedConfig.java | 25 +--- .../dataframe/DataFrameAnalyticsSource.java | 36 +++++- .../ml/utils/RuntimeMappingsValidator.java | 39 ++++++ .../xpack/core/ml/config_index_mappings.json | 4 + ...tDataFrameAnalyticsActionRequestTests.java | 4 +- .../DataFrameAnalyticsConfigTests.java | 8 +- .../DataFrameAnalyticsSourceTests.java | 33 ++++- .../utils/RuntimeMappingsValidatorTests.java | 46 +++++++ .../ml/integration/ClassificationIT.java | 87 ++++++++++++- .../DataFrameAnalysisCustomFeatureIT.java | 2 +- .../ExplainDataFrameAnalyticsIT.java | 76 ++++++++++- ...NativeDataFrameAnalyticsIntegTestCase.java | 3 +- .../xpack/ml/integration/RegressionIT.java | 82 +++++++++++- .../integration/RunDataFrameAnalyticsIT.java | 118 ++++++++++++++++-- .../ChunkedTrainedModelPersisterIT.java | 2 +- .../ml/integration/UnusedStatsRemoverIT.java | 2 +- .../xpack/ml/dataframe/DestinationIndex.java | 65 +++++----- .../extractor/DataFrameDataExtractor.java | 5 +- .../DataFrameDataExtractorContext.java | 8 +- .../DataFrameDataExtractorFactory.java | 14 ++- .../ExtractedFieldsDetectorFactory.java | 6 +- .../ml/dataframe/DestinationIndexTests.java | 2 +- .../ml/dataframe/MappingsMergerTests.java | 4 +- .../DataFrameDataExtractorTests.java | 2 +- .../ExtractedFieldsDetectorTests.java | 6 +- .../inference/InferenceRunnerTests.java | 2 +- .../AnalyticsResultProcessorTests.java | 2 +- .../ChunkedTrainedModelPersisterTests.java | 2 +- .../test/ml/data_frame_analytics_crud.yml | 36 ++++++ 35 files changed, 665 insertions(+), 119 deletions(-) create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/RuntimeMappingsValidator.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/RuntimeMappingsValidatorTests.java diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsSource.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsSource.java index efe2e6a217e65..6999d1dd4cba7 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsSource.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsSource.java @@ -15,11 +15,13 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Objects; public class DataFrameAnalyticsSource implements ToXContentObject { @@ -45,16 +47,20 @@ public static Builder builder() { (p, c) -> FetchSourceContext.fromXContent(p), _SOURCE, ObjectParser.ValueType.OBJECT_ARRAY_BOOLEAN_OR_STRING); + PARSER.declareObject(Builder::setRuntimeMappings, (p, c) -> p.map(), SearchSourceBuilder.RUNTIME_MAPPINGS_FIELD); } private final String[] index; private final QueryConfig queryConfig; private final FetchSourceContext sourceFiltering; + private final Map runtimeMappings; - private DataFrameAnalyticsSource(String[] index, @Nullable QueryConfig queryConfig, @Nullable FetchSourceContext sourceFiltering) { + private DataFrameAnalyticsSource(String[] index, @Nullable QueryConfig queryConfig, @Nullable FetchSourceContext sourceFiltering, + @Nullable Map runtimeMappings) { this.index = Objects.requireNonNull(index); this.queryConfig = queryConfig; this.sourceFiltering = sourceFiltering; + this.runtimeMappings = runtimeMappings; } public String[] getIndex() { @@ -69,6 +75,10 @@ public FetchSourceContext getSourceFiltering() { return sourceFiltering; } + public Map getRuntimeMappings() { + return runtimeMappings; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -79,6 +89,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (sourceFiltering != null) { builder.field(_SOURCE.getPreferredName(), sourceFiltering); } + if (runtimeMappings != null) { + builder.field(SearchSourceBuilder.RUNTIME_MAPPINGS_FIELD.getPreferredName(), runtimeMappings); + } builder.endObject(); return builder; } @@ -91,12 +104,13 @@ public boolean equals(Object o) { DataFrameAnalyticsSource other = (DataFrameAnalyticsSource) o; return Arrays.equals(index, other.index) && Objects.equals(queryConfig, other.queryConfig) - && Objects.equals(sourceFiltering, other.sourceFiltering); + && Objects.equals(sourceFiltering, other.sourceFiltering) + && Objects.equals(runtimeMappings, other.runtimeMappings); } @Override public int hashCode() { - return Objects.hash(Arrays.asList(index), queryConfig, sourceFiltering); + return Objects.hash(Arrays.asList(index), queryConfig, sourceFiltering, runtimeMappings); } @Override @@ -109,6 +123,7 @@ public static class Builder { private String[] index; private QueryConfig queryConfig; private FetchSourceContext sourceFiltering; + private Map runtimeMappings; private Builder() {} @@ -132,8 +147,13 @@ public Builder setSourceFiltering(FetchSourceContext sourceFiltering) { return this; } + public Builder setRuntimeMappings(Map runtimeMappings) { + this.runtimeMappings = runtimeMappings; + return this; + } + public DataFrameAnalyticsSource build() { - return new DataFrameAnalyticsSource(index, queryConfig, sourceFiltering); + return new DataFrameAnalyticsSource(index, queryConfig, sourceFiltering, runtimeMappings); } } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java index 67f5c8a34ec43..f6ac64c0002b3 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java @@ -2995,13 +2995,16 @@ public void testPutDataFrameAnalytics() throws Exception { QueryConfig queryConfig = new QueryConfig(new MatchAllQueryBuilder()); // end::put-data-frame-analytics-query-config + Map runtimeMappings = Collections.emptyMap(); + // tag::put-data-frame-analytics-source-config DataFrameAnalyticsSource sourceConfig = DataFrameAnalyticsSource.builder() // <1> .setIndex("put-test-source-index") // <2> .setQueryConfig(queryConfig) // <3> + .setRuntimeMappings(runtimeMappings) // <4> .setSourceFiltering(new FetchSourceContext(true, new String[] { "included_field_1", "included_field_2" }, - new String[] { "excluded_field" })) // <4> + new String[] { "excluded_field" })) // <5> .build(); // end::put-data-frame-analytics-source-config diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsSourceTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsSourceTests.java index 3e7dd617fd744..162e80acc9204 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsSourceTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsSourceTests.java @@ -16,6 +16,8 @@ import org.elasticsearch.test.AbstractXContentTestCase; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import java.util.function.Predicate; import static java.util.Collections.emptyList; @@ -31,11 +33,19 @@ public static DataFrameAnalyticsSource randomSourceConfig() { generateRandomStringArray(10, 10, false, false), generateRandomStringArray(10, 10, false, false)); } - + Map runtimeMappings = null; + if (randomBoolean()) { + runtimeMappings = new HashMap<>(); + Map runtimeField = new HashMap<>(); + runtimeField.put("type", "keyword"); + runtimeField.put("script", ""); + runtimeMappings.put(randomAlphaOfLength(10), runtimeField); + } return DataFrameAnalyticsSource.builder() .setIndex(generateRandomStringArray(10, 10, false, false)) .setQueryConfig(randomBoolean() ? null : randomQueryConfig()) .setSourceFiltering(sourceFiltering) + .setRuntimeMappings(runtimeMappings) .build(); } diff --git a/docs/java-rest/high-level/ml/put-data-frame-analytics.asciidoc b/docs/java-rest/high-level/ml/put-data-frame-analytics.asciidoc index 19b21b0d7e91c..30914d539ce7c 100644 --- a/docs/java-rest/high-level/ml/put-data-frame-analytics.asciidoc +++ b/docs/java-rest/high-level/ml/put-data-frame-analytics.asciidoc @@ -55,7 +55,8 @@ include-tagged::{doc-tests-file}[{api}-source-config] <1> Constructing a new DataFrameAnalyticsSource <2> The source index <3> The query from which to gather the data. If query is not set, a `match_all` query is used by default. -<4> Source filtering to select which fields will exist in the destination index. +<4> Runtime mappings that will be added to the destination index mapping. +<5> Source filtering to select which fields will exist in the destination index. ===== QueryConfig diff --git a/docs/reference/ml/df-analytics/apis/put-dfanalytics.asciidoc b/docs/reference/ml/df-analytics/apis/put-dfanalytics.asciidoc index 5cf3b3ed306b8..848b6eb767304 100644 --- a/docs/reference/ml/df-analytics/apis/put-dfanalytics.asciidoc +++ b/docs/reference/ml/df-analytics/apis/put-dfanalytics.asciidoc @@ -523,7 +523,7 @@ setting, an error occurs when you try to create {dfanalytics-jobs} that have `source`:: (object) The configuration of how to source the analysis data. It requires an `index`. -Optionally, `query` and `_source` may be specified. +Optionally, `query`, `runtime_mappings`, and `_source` may be specified. + .Properties of `source` [%collapsible%open] @@ -543,6 +543,10 @@ options that are supported by {es} can be used, as this object is passed verbatim to {es}. By default, this property has the following value: `{"match_all": {}}`. +`runtime_mappings`::: +(Optional, object) Definitions of runtime fields that will become part of the +mapping of the destination index. + `_source`::: (Optional, object) Specify `includes` and/or `excludes` patterns to select which fields will be present in the destination. Fields that are excluded cannot be diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java index 0e07ff1c3fe25..9c74882fa9c96 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java @@ -31,6 +31,7 @@ import java.util.Arrays; import java.util.List; +import java.util.Map; /** * A search action request builder. @@ -592,4 +593,12 @@ public SearchRequestBuilder setPreFilterShardSize(int preFilterShardSize) { this.request.setPreFilterShardSize(preFilterShardSize); return this; } + + /** + * Set runtime mappings to create runtime fields that exist only as part of this particular search. + */ + public SearchRequestBuilder setRuntimeMappings(Map runtimeMappings) { + sourceBuilder().runtimeMappings(runtimeMappings); + return this; + } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java index c0230d13ea7aa..d2fe0b1831b72 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java @@ -36,6 +36,7 @@ import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.MlStrings; import org.elasticsearch.xpack.core.ml.utils.QueryProvider; +import org.elasticsearch.xpack.core.ml.utils.RuntimeMappingsValidator; import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; import org.elasticsearch.xpack.core.ml.utils.XContentObjectTransformer; @@ -825,7 +826,7 @@ public DatafeedConfig build() { } validateScriptFields(); - validateRuntimeMappings(); + RuntimeMappingsValidator.validate(runtimeMappings); setDefaultChunkingConfig(); setDefaultQueryDelay(); @@ -846,28 +847,6 @@ void validateScriptFields() { } } - /** - * Perform a light check that the structure resembles runtime_mappings. - * The full check cannot happen until search - */ - void validateRuntimeMappings() { - for (Map.Entry entry : runtimeMappings.entrySet()) { - // top level objects are fields - String fieldName = entry.getKey(); - if (entry.getValue() instanceof Map) { - @SuppressWarnings("unchecked") - Map propNode = new HashMap<>(((Map) entry.getValue())); - Object typeNode = propNode.get("type"); - if (typeNode == null) { - throw ExceptionsHelper.badRequestException("No type specified for runtime field [" + fieldName + "]"); - } - } else { - throw ExceptionsHelper.badRequestException("Expected map for runtime field [" + fieldName + "] " + - "definition but got a " + fieldName.getClass().getSimpleName()); - } - } - } - private static void checkNoMoreHistogramAggregations(Collection aggregations) { for (AggregationBuilder agg : aggregations) { if (ExtractorUtils.isHistogram(agg)) { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsSource.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsSource.java index 13e06ae2e4df0..b0db6d7083e4b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsSource.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsSource.java @@ -21,15 +21,19 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.QueryProvider; +import org.elasticsearch.xpack.core.ml.utils.RuntimeMappingsValidator; import org.elasticsearch.xpack.core.ml.utils.XContentObjectTransformer; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -45,7 +49,8 @@ public static ConstructingObjectParser createPar ignoreUnknownFields, a -> new DataFrameAnalyticsSource( ((List) a[0]).toArray(new String[0]), (QueryProvider) a[1], - (FetchSourceContext) a[2])); + (FetchSourceContext) a[2], + (Map) a[3])); parser.declareStringArray(ConstructingObjectParser.constructorArg(), INDEX); parser.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> QueryProvider.fromXContent(p, ignoreUnknownFields, Messages.DATA_FRAME_ANALYTICS_BAD_QUERY_FORMAT), QUERY); @@ -53,14 +58,18 @@ public static ConstructingObjectParser createPar (p, c) -> FetchSourceContext.fromXContent(p), _SOURCE, ObjectParser.ValueType.OBJECT_ARRAY_BOOLEAN_OR_STRING); + parser.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.map(), + SearchSourceBuilder.RUNTIME_MAPPINGS_FIELD); return parser; } private final String[] index; private final QueryProvider queryProvider; private final FetchSourceContext sourceFiltering; + private final Map runtimeMappings; - public DataFrameAnalyticsSource(String[] index, @Nullable QueryProvider queryProvider, @Nullable FetchSourceContext sourceFiltering) { + public DataFrameAnalyticsSource(String[] index, @Nullable QueryProvider queryProvider, @Nullable FetchSourceContext sourceFiltering, + @Nullable Map runtimeMappings) { this.index = ExceptionsHelper.requireNonNull(index, INDEX); if (index.length == 0) { throw new IllegalArgumentException("source.index must specify at least one index"); @@ -73,6 +82,8 @@ public DataFrameAnalyticsSource(String[] index, @Nullable QueryProvider queryPro throw new IllegalArgumentException("source._source cannot be disabled"); } this.sourceFiltering = sourceFiltering; + this.runtimeMappings = runtimeMappings == null ? Collections.emptyMap() : Collections.unmodifiableMap(runtimeMappings); + RuntimeMappingsValidator.validate(this.runtimeMappings); } public DataFrameAnalyticsSource(StreamInput in) throws IOException { @@ -83,6 +94,11 @@ public DataFrameAnalyticsSource(StreamInput in) throws IOException { } else { sourceFiltering = null; } + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + runtimeMappings = in.readMap(); + } else { + runtimeMappings = Collections.emptyMap(); + } } public DataFrameAnalyticsSource(DataFrameAnalyticsSource other) { @@ -90,6 +106,7 @@ public DataFrameAnalyticsSource(DataFrameAnalyticsSource other) { this.queryProvider = new QueryProvider(other.queryProvider); this.sourceFiltering = other.sourceFiltering == null ? null : new FetchSourceContext( other.sourceFiltering.fetchSource(), other.sourceFiltering.includes(), other.sourceFiltering.excludes()); + this.runtimeMappings = Collections.unmodifiableMap(new HashMap<>(other.runtimeMappings)); } @Override @@ -99,6 +116,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_7_6_0)) { out.writeOptionalWriteable(sourceFiltering); } + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeMap(runtimeMappings); + } } @Override @@ -109,6 +129,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (sourceFiltering != null) { builder.field(_SOURCE.getPreferredName(), sourceFiltering); } + if (runtimeMappings.isEmpty() == false) { + builder.field(SearchSourceBuilder.RUNTIME_MAPPINGS_FIELD.getPreferredName(), runtimeMappings); + } builder.endObject(); return builder; } @@ -121,12 +144,13 @@ public boolean equals(Object o) { DataFrameAnalyticsSource other = (DataFrameAnalyticsSource) o; return Arrays.equals(index, other.index) && Objects.equals(queryProvider, other.queryProvider) - && Objects.equals(sourceFiltering, other.sourceFiltering); + && Objects.equals(sourceFiltering, other.sourceFiltering) + && Objects.equals(runtimeMappings, other.runtimeMappings); } @Override public int hashCode() { - return Objects.hash(Arrays.asList(index), queryProvider, sourceFiltering); + return Objects.hash(Arrays.asList(index), queryProvider, sourceFiltering, runtimeMappings); } public String[] getIndex() { @@ -189,6 +213,10 @@ Map getQuery() { return queryProvider.getQuery(); } + public Map getRuntimeMappings() { + return runtimeMappings; + } + public boolean isFieldExcluded(String path) { if (sourceFiltering == null) { return false; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/RuntimeMappingsValidator.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/RuntimeMappingsValidator.java new file mode 100644 index 0000000000000..ae7037d15bd02 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/RuntimeMappingsValidator.java @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.ml.utils; + +import java.util.HashMap; +import java.util.Map; + +public final class RuntimeMappingsValidator { + + /** + * Perform a light check that the structure resembles runtime_mappings. + * The full check cannot happen until search + */ + public static void validate(Map runtimeMappings) { + for (Map.Entry entry : runtimeMappings.entrySet()) { + // top level objects are fields + String fieldName = entry.getKey(); + if (entry.getValue() instanceof Map) { + @SuppressWarnings("unchecked") + Map propNode = new HashMap<>(((Map) entry.getValue())); + Object typeNode = propNode.get("type"); + if (typeNode == null) { + throw ExceptionsHelper.badRequestException("No type specified for runtime field [{}]", fieldName); + } + } else { + throw ExceptionsHelper.badRequestException( + "Expected map for runtime field [{}] definition but got a {}", + fieldName, + fieldName.getClass().getSimpleName() + ); + } + } + } +} diff --git a/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json b/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json index f01e1d9a61abc..68a063acbbb10 100644 --- a/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json +++ b/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json @@ -464,6 +464,10 @@ "query" : { "type" : "object", "enabled" : false + }, + "runtime_mappings" : { + "type" : "object", + "enabled" : false } } }, diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PutDataFrameAnalyticsActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PutDataFrameAnalyticsActionRequestTests.java index a3ff6392a1647..cad20194dfc59 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PutDataFrameAnalyticsActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PutDataFrameAnalyticsActionRequestTests.java @@ -81,7 +81,7 @@ protected Request doParseInstance(XContentParser parser) { public void testValidate_GivenRequestWithIncludedAnalyzedFieldThatIsExcludedInSourceFiltering() { DataFrameAnalyticsSource source = new DataFrameAnalyticsSource(new String[] {"index"}, null, - new FetchSourceContext(true, null, new String[] {"excluded"})); + new FetchSourceContext(true, null, new String[] {"excluded"}), null); FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[] {"excluded"}, null); DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() .setId("foo") @@ -99,7 +99,7 @@ public void testValidate_GivenRequestWithIncludedAnalyzedFieldThatIsExcludedInSo public void testValidate_GivenRequestWithIncludedAnalyzedFieldThatIsIncludedInSourceFiltering() { DataFrameAnalyticsSource source = new DataFrameAnalyticsSource(new String[] {"index"}, null, - new FetchSourceContext(true, new String[] {"included"}, null)); + new FetchSourceContext(true, new String[] {"included"}, null), null); FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[] {"included"}, null); DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() .setId("foo") diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigTests.java index 86e5ee3b365c2..b8c9bfaf71dcb 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigTests.java @@ -468,7 +468,7 @@ public void testToXContent_GivenAnalysisWithRandomizeSeedAndVersionIsCurrent() t DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() .setVersion(Version.CURRENT) .setId("test_config") - .setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null)) + .setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null, null)) .setDest(new DataFrameAnalyticsDest("dest_index", null)) .setAnalysis(regression) .build(); @@ -487,7 +487,7 @@ public void testToXContent_GivenAnalysisWithRandomizeSeedAndVersionIsBeforeItWas DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() .setVersion(Version.V_7_5_0) .setId("test_config") - .setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null)) + .setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null, null)) .setDest(new DataFrameAnalyticsDest("dest_index", null)) .setAnalysis(regression) .build(); @@ -509,7 +509,7 @@ public void testExtractJobIdFromDocId() { public void testCtor_GivenMaxNumThreadsIsZero() { ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> new DataFrameAnalyticsConfig.Builder() .setId("test_config") - .setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null)) + .setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null, null)) .setDest(new DataFrameAnalyticsDest("dest_index", null)) .setAnalysis(new Regression("foo")) .setMaxNumThreads(0) @@ -522,7 +522,7 @@ public void testCtor_GivenMaxNumThreadsIsZero() { public void testCtor_GivenMaxNumThreadsIsNegative() { ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> new DataFrameAnalyticsConfig.Builder() .setId("test_config") - .setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null)) + .setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null, null)) .setDest(new DataFrameAnalyticsDest("dest_index", null)) .setAnalysis(new Regression("foo")) .setMaxNumThreads(randomIntBetween(Integer.MIN_VALUE, 0)) diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsSourceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsSourceTests.java index 9a7edca44953f..25b4d17a00bcb 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsSourceTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsSourceTests.java @@ -6,6 +6,7 @@ */ package org.elasticsearch.xpack.core.ml.dataframe; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.Writeable; @@ -22,7 +23,9 @@ import java.io.UncheckedIOException; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -68,12 +71,24 @@ public static DataFrameAnalyticsSource createRandom() { generateRandomStringArray(10, 10, false, false), generateRandomStringArray(10, 10, false, false)); } - return new DataFrameAnalyticsSource(index, queryProvider, sourceFiltering); + Map runtimeMappings = null; + if (randomBoolean()) { + runtimeMappings = new HashMap<>(); + Map runtimeField = new HashMap<>(); + runtimeField.put("type", "keyword"); + runtimeField.put("script", ""); + runtimeMappings.put(randomAlphaOfLength(10), runtimeField); + + } + return new DataFrameAnalyticsSource(index, queryProvider, sourceFiltering, runtimeMappings); } public static DataFrameAnalyticsSource mutateForVersion(DataFrameAnalyticsSource instance, Version version) { if (version.before(Version.V_7_6_0)) { - return new DataFrameAnalyticsSource(instance.getIndex(), instance.getQueryProvider(), null); + return new DataFrameAnalyticsSource(instance.getIndex(), instance.getQueryProvider(), null, null); + } + if (version.before(Version.V_8_0_0)) { + return new DataFrameAnalyticsSource(instance.getIndex(), instance.getQueryProvider(), instance.getSourceFiltering(), null); } return instance; } @@ -85,18 +100,24 @@ protected Writeable.Reader instanceReader() { public void testConstructor_GivenDisabledSource() { IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new DataFrameAnalyticsSource( - new String[] {"index"}, null, new FetchSourceContext(false, null, null))); + new String[] {"index"}, null, new FetchSourceContext(false, null, null), null)); assertThat(e.getMessage(), equalTo("source._source cannot be disabled")); } + public void testConstructor_GivenInvalidRuntimeMappings() { + ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> new DataFrameAnalyticsSource( + new String[] {"index"}, null, null, Collections.singletonMap("typeless", Collections.singletonMap("not a type", "42")))); + assertThat(e.getMessage(), equalTo("No type specified for runtime field [typeless]")); + } + public void testIsFieldExcluded_GivenNoSourceFiltering() { - DataFrameAnalyticsSource source = new DataFrameAnalyticsSource(new String[] { "index" }, null, null); + DataFrameAnalyticsSource source = new DataFrameAnalyticsSource(new String[] { "index" }, null, null, null); assertThat(source.isFieldExcluded(randomAlphaOfLength(10)), is(false)); } public void testIsFieldExcluded_GivenSourceFilteringWithNulls() { DataFrameAnalyticsSource source = new DataFrameAnalyticsSource(new String[] { "index" }, null, - new FetchSourceContext(true, null, null)); + new FetchSourceContext(true, null, null), null); assertThat(source.isFieldExcluded(randomAlphaOfLength(10)), is(false)); } @@ -139,7 +160,7 @@ private static DataFrameAnalyticsSource newSourceWithExcludes(String... excludes private static DataFrameAnalyticsSource newSourceWithIncludesExcludes(List includes, List excludes) { FetchSourceContext sourceFiltering = new FetchSourceContext(true, includes.toArray(new String[0]), excludes.toArray(new String[0])); - return new DataFrameAnalyticsSource(new String[] { "index" } , null, sourceFiltering); + return new DataFrameAnalyticsSource(new String[] { "index" } , null, sourceFiltering, null); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/RuntimeMappingsValidatorTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/RuntimeMappingsValidatorTests.java new file mode 100644 index 0000000000000..308dc6d693e81 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/RuntimeMappingsValidatorTests.java @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.ml.utils; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.ESTestCase; + +import java.util.Collections; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + +public class RuntimeMappingsValidatorTests extends ESTestCase { + + public void testValidate_GivenFieldWhoseValueIsNotMap() { + Map runtimeMappings = Collections.singletonMap("mapless", "not a map"); + + ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> RuntimeMappingsValidator.validate(runtimeMappings)); + + assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); + assertThat(e.getMessage(), equalTo("Expected map for runtime field [mapless] definition but got a String")); + } + + public void testValidate_GivenFieldWithoutType() { + Map fieldMapping = Collections.singletonMap("not a type", "42"); + Map runtimeMappings = Collections.singletonMap("typeless", fieldMapping); + + ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> RuntimeMappingsValidator.validate(runtimeMappings)); + + assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); + assertThat(e.getMessage(), equalTo("No type specified for runtime field [typeless]")); + } + + public void testValidate_GivenValid() { + Map fieldMapping = Collections.singletonMap("type", "keyword"); + Map runtimeMappings = Collections.singletonMap("valid_field", fieldMapping); + + RuntimeMappingsValidator.validate(runtimeMappings); + } +} diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java index a03ff5a8c07a1..61d2696eca114 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java @@ -28,15 +28,19 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchModule; +import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.xpack.core.ml.action.EvaluateDataFrameAction; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetTrainedModelsAction; import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigUpdate; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource; import org.elasticsearch.xpack.core.ml.dataframe.analyses.BoostedTreeParams; import org.elasticsearch.xpack.core.ml.dataframe.analyses.Classification; import org.elasticsearch.xpack.core.ml.dataframe.analyses.MlDataFrameAnalysisNamedXContentProvider; +import org.elasticsearch.xpack.core.ml.dataframe.analyses.Regression; import org.elasticsearch.xpack.core.ml.dataframe.evaluation.classification.Accuracy; import org.elasticsearch.xpack.core.ml.dataframe.evaluation.classification.AucRoc; import org.elasticsearch.xpack.core.ml.dataframe.evaluation.classification.MulticlassConfusionMatrix; @@ -47,7 +51,6 @@ import org.elasticsearch.xpack.core.ml.inference.TrainedModelConfig; import org.elasticsearch.xpack.core.ml.inference.preprocessing.OneHotEncoding; import org.elasticsearch.xpack.core.ml.inference.preprocessing.PreProcessor; -import org.elasticsearch.xpack.core.ml.utils.PhaseProgress; import org.junit.After; import org.junit.Before; @@ -55,6 +58,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -62,12 +66,12 @@ import static java.util.stream.Collectors.toList; import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE; import static org.hamcrest.Matchers.allOf; -import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.emptyString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.in; @@ -810,6 +814,85 @@ public void testTooLowConfiguredMemoryStillStarts() throws Exception { waitUntilAnalyticsIsStopped(jobId); } + public void testWithSearchRuntimeMappings() throws Exception { + initialize("classification_with_search_runtime_mappings"); + indexData(sourceIndex, 300, 50, KEYWORD_FIELD); + + String numericRuntimeField = NUMERICAL_FIELD + "_runtime"; + String dependentVariableRuntimeField = KEYWORD_FIELD + "_runtime"; + + String predictedClassField = dependentVariableRuntimeField + "_prediction"; + + Map numericRuntimeFieldMapping = new HashMap<>(); + numericRuntimeFieldMapping.put("type", "double"); + numericRuntimeFieldMapping.put("script", "emit(doc['" + NUMERICAL_FIELD + "'].value)"); + Map dependentVariableRuntimeFieldMapping = new HashMap<>(); + dependentVariableRuntimeFieldMapping.put("type", "keyword"); + dependentVariableRuntimeFieldMapping.put("script", + "if (doc['" + KEYWORD_FIELD + "'].size() > 0) { emit(doc['" + KEYWORD_FIELD + "'].value); }"); + Map runtimeFields = new HashMap<>(); + runtimeFields.put(numericRuntimeField, numericRuntimeFieldMapping); + runtimeFields.put(dependentVariableRuntimeField, dependentVariableRuntimeFieldMapping); + + DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() + .setId(jobId) + .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null, runtimeFields)) + .setDest(new DataFrameAnalyticsDest(destIndex, null)) + .setAnalyzedFields(new FetchSourceContext(true, new String[] { numericRuntimeField, dependentVariableRuntimeField }, null)) + .setAnalysis(new Classification( + dependentVariableRuntimeField, + BoostedTreeParams.builder().setNumTopFeatureImportanceValues(1).build(), + predictedClassField, + null, + null, + null, + null, + null, + null)) + .build(); + + putAnalytics(config); + + assertIsStopped(jobId); + assertProgressIsZero(jobId); + + startAnalytics(jobId); + waitUntilAnalyticsIsStopped(jobId); + + client().admin().indices().refresh(new RefreshRequest(destIndex)); + SearchResponse destData = client().prepareSearch(destIndex).setTrackTotalHits(true).setSize(1000).get(); + for (SearchHit hit : destData.getHits()) { + Map destDoc = hit.getSourceAsMap(); + Map resultsObject = getFieldValue(destDoc, "ml"); + assertThat(getFieldValue(resultsObject, predictedClassField), is(in(KEYWORD_FIELD_VALUES))); + assertThat(getFieldValue(resultsObject, "is_training"), is(destDoc.containsKey(KEYWORD_FIELD))); + assertTopClasses(resultsObject, 2, dependentVariableRuntimeField, KEYWORD_FIELD_VALUES); + @SuppressWarnings("unchecked") + List> importanceArray = (List>)resultsObject.get("feature_importance"); + assertThat(importanceArray, hasSize(1)); + assertThat(importanceArray.get(0), hasEntry("feature_name", numericRuntimeField)); + } + + assertProgressComplete(jobId); + assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L)); + assertModelStatePersisted(stateDocId()); + assertExactlyOneInferenceModelPersisted(jobId); + assertMlResultsFieldMappings(destIndex, predictedClassField, "keyword"); + assertThatAuditMessagesMatch(jobId, + "Created analytics with analysis type [classification]", + "Estimated memory usage for this analytics to be", + "Starting analytics on node", + "Started analytics", + expectedDestIndexAuditMessage(), + "Started reindexing to destination index [" + destIndex + "]", + "Finished reindexing to destination index [" + destIndex + "]", + "Started loading data", + "Started analyzing", + "Started writing results", + "Finished analysis"); + assertEvaluation(KEYWORD_FIELD, KEYWORD_FIELD_VALUES, "ml." + predictedClassField); + } + private static T getOnlyElement(List list) { assertThat(list, hasSize(1)); return list.get(0); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DataFrameAnalysisCustomFeatureIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DataFrameAnalysisCustomFeatureIT.java index e85dbdb58cc67..c4da2e83758ae 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DataFrameAnalysisCustomFeatureIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DataFrameAnalysisCustomFeatureIT.java @@ -110,7 +110,7 @@ public void testNGramCustomFeature() throws Exception { DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() .setId(jobId) .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, - QueryProvider.fromParsedQuery(QueryBuilders.matchAllQuery()), null)) + QueryProvider.fromParsedQuery(QueryBuilders.matchAllQuery()), null, null)) .setDest(new DataFrameAnalyticsDest(destIndex, null)) .setAnalysis(new Regression(NUMERICAL_FIELD, BoostedTreeParams.builder().setNumTopFeatureImportanceValues(6).build(), diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ExplainDataFrameAnalyticsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ExplainDataFrameAnalyticsIT.java index 289a0d64d7631..3d20104430c8b 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ExplainDataFrameAnalyticsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ExplainDataFrameAnalyticsIT.java @@ -8,6 +8,7 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; @@ -17,25 +18,33 @@ import org.elasticsearch.xpack.core.ml.action.ExplainDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource; import org.elasticsearch.xpack.core.ml.dataframe.analyses.BoostedTreeParams; import org.elasticsearch.xpack.core.ml.dataframe.analyses.Classification; -import org.elasticsearch.xpack.core.ml.dataframe.analyses.Regression; import org.elasticsearch.xpack.core.ml.dataframe.analyses.OutlierDetection; +import org.elasticsearch.xpack.core.ml.dataframe.analyses.Regression; +import org.elasticsearch.xpack.core.ml.dataframe.explain.FieldSelection; import org.elasticsearch.xpack.core.ml.utils.QueryProvider; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; public class ExplainDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTestCase { public void testExplain_GivenMissingSourceIndex() { DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() - .setSource(new DataFrameAnalyticsSource(new String[] {"missing_index"}, null, null)) + .setSource(new DataFrameAnalyticsSource(new String[] {"missing_index"}, null, null, Collections.emptyMap())) .setAnalysis(new OutlierDetection.Builder().build()) .buildForExplain(); @@ -81,7 +90,8 @@ public void testSourceQueryIsApplied() throws IOException { .setId(id) .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, QueryProvider.fromParsedQuery(QueryBuilders.termQuery("filtered_field", "bingo")), - null)) + null, + Collections.emptyMap())) .setAnalysis(new Classification("categorical")) .buildForExplain(); @@ -98,7 +108,8 @@ public void testTrainingPercentageIsApplied() throws IOException { .setId("dfa-training-100-" + sourceIndex) .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, QueryProvider.fromParsedQuery(QueryBuilders.matchAllQuery()), - null)) + null, + Collections.emptyMap())) .setAnalysis(new Regression(RegressionIT.DEPENDENT_VARIABLE_FIELD, BoostedTreeParams.builder().build(), null, @@ -118,7 +129,8 @@ public void testTrainingPercentageIsApplied() throws IOException { .setId("dfa-training-50-" + sourceIndex) .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, QueryProvider.fromParsedQuery(QueryBuilders.matchAllQuery()), - null)) + null, + Collections.emptyMap())) .setAnalysis(new Regression(RegressionIT.DEPENDENT_VARIABLE_FIELD, BoostedTreeParams.builder().build(), null, @@ -147,7 +159,8 @@ public void testSimultaneousExplainSameConfig() throws IOException { .setId("dfa-simultaneous-explain-" + sourceIndex) .setSource(new DataFrameAnalyticsSource(new String[]{sourceIndex}, QueryProvider.fromParsedQuery(QueryBuilders.matchAllQuery()), - null)) + null, + Collections.emptyMap())) .setAnalysis(new Regression(RegressionIT.DEPENDENT_VARIABLE_FIELD, BoostedTreeParams.builder().build(), null, @@ -181,6 +194,57 @@ public void testSimultaneousExplainSameConfig() throws IOException { } } + public void testRuntimeFields() { + String sourceIndex = "test-explain-runtime-fields"; + String mapping = "{\n" + + " \"properties\": {\n" + + " \"mapped_field\": {\n" + + " \"type\": \"double\"\n" + + " }\n" + + " },\n" + + " \"runtime\": {\n" + + " \"mapped_runtime_field\": {\n" + + " \"type\": \"double\"\n," + + " \"script\": \"emit(doc['mapped_field'].value + 10.0)\"\n" + + " }\n" + + " }\n" + + " }"; + client().admin().indices().prepareCreate(sourceIndex) + .setMapping(mapping) + .get(); + BulkRequestBuilder bulkRequestBuilder = client().prepareBulk() + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for (int i = 0; i < 10; i++) { + Object[] source = new Object[] {"mapped_field", i}; + IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source).opType(DocWriteRequest.OpType.CREATE); + bulkRequestBuilder.add(indexRequest); + } + BulkResponse bulkResponse = bulkRequestBuilder.get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + } + + Map configRuntimeField = new HashMap<>(); + configRuntimeField.put("type", "double"); + configRuntimeField.put("script", "emit(doc['mapped_field'].value + 20.0)"); + Map configRuntimeFields = Collections.singletonMap("config_runtime_field", configRuntimeField); + + DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() + .setId(sourceIndex + "-job") + .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null, configRuntimeFields)) + .setDest(new DataFrameAnalyticsDest(sourceIndex + "-results", null)) + .setAnalysis(new OutlierDetection.Builder().build()) + .build(); + + ExplainDataFrameAnalyticsAction.Response explainResponse = explainDataFrame(config); + List fieldSelection = explainResponse.getFieldSelection(); + + assertThat(fieldSelection.size(), equalTo(3)); + assertThat(fieldSelection.stream().map(FieldSelection::getName).collect(Collectors.toList()), + contains("config_runtime_field", "mapped_field", "mapped_runtime_field")); + assertThat(fieldSelection.stream().map(FieldSelection::isIncluded).allMatch(isIncluded -> isIncluded), is(true)); + } + @Override boolean supportsInference() { return false; diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java index fb34d4cc27975..04937e60cd7ab 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java @@ -200,7 +200,8 @@ protected static DataFrameAnalyticsConfig buildAnalytics(String id, String sourc QueryBuilder queryBuilder) throws Exception { return new DataFrameAnalyticsConfig.Builder() .setId(id) - .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, QueryProvider.fromParsedQuery(queryBuilder), null)) + .setSource(new DataFrameAnalyticsSource( + new String[] { sourceIndex }, QueryProvider.fromParsedQuery(queryBuilder), null, Collections.emptyMap())) .setDest(new DataFrameAnalyticsDest(destIndex, resultsField)) .setAnalysis(analysis) .build(); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java index e676d4adb9453..58810a7653b12 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java @@ -42,6 +42,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -50,6 +51,8 @@ import static org.hamcrest.Matchers.emptyString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; @@ -572,7 +575,7 @@ public void testAliasFields() throws Exception { null); DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() .setId(jobId) - .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null)) + .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null, Collections.emptyMap())) .setDest(new DataFrameAnalyticsDest(destIndex, null)) .setAnalysis(regression) .setAnalyzedFields(new FetchSourceContext(true, null, new String[] {"field_1"})) @@ -693,6 +696,83 @@ public void testWithCustomFeatureProcessors() throws Exception { } } + public void testWithSearchRuntimeMappings() throws Exception { + initialize("regression_with_search_runtime_mappings"); + indexData(sourceIndex, 300, 50); + + String numericRuntimeField = NUMERICAL_FEATURE_FIELD + "_runtime"; + String dependentVariableRuntimeField = DEPENDENT_VARIABLE_FIELD + "_runtime"; + + String predictedClassField = dependentVariableRuntimeField + "_prediction"; + + Map numericRuntimeFieldMapping = new HashMap<>(); + numericRuntimeFieldMapping.put("type", "double"); + numericRuntimeFieldMapping.put("script", "emit(doc['" + NUMERICAL_FEATURE_FIELD + "'].value)"); + Map dependentVariableRuntimeFieldMapping = new HashMap<>(); + dependentVariableRuntimeFieldMapping.put("type", "double"); + dependentVariableRuntimeFieldMapping.put("script", + "if (doc['" + DEPENDENT_VARIABLE_FIELD + "'].size() > 0) { emit(doc['" + DEPENDENT_VARIABLE_FIELD + "'].value); }"); + Map runtimeFields = new HashMap<>(); + runtimeFields.put(numericRuntimeField, numericRuntimeFieldMapping); + runtimeFields.put(dependentVariableRuntimeField, dependentVariableRuntimeFieldMapping); + + DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() + .setId(jobId) + .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null, runtimeFields)) + .setDest(new DataFrameAnalyticsDest(destIndex, null)) + .setAnalyzedFields(new FetchSourceContext(true, new String[] { numericRuntimeField, dependentVariableRuntimeField }, null)) + .setAnalysis(new Regression( + dependentVariableRuntimeField, + BoostedTreeParams.builder().setNumTopFeatureImportanceValues(1).build(), + null, + null, + null, + null, + null, + null, + null)) + .build(); + putAnalytics(config); + + assertIsStopped(jobId); + assertProgressIsZero(jobId); + + startAnalytics(jobId); + waitUntilAnalyticsIsStopped(jobId); + + SearchResponse destData = client().prepareSearch(destIndex).setTrackTotalHits(true).setSize(1000).get(); + for (SearchHit hit : destData.getHits()) { + Map destDoc = hit.getSourceAsMap(); + Map resultsObject = getMlResultsObjectFromDestDoc(destDoc); + + assertThat(resultsObject.containsKey(predictedClassField), is(true)); + assertThat(resultsObject.containsKey("is_training"), is(true)); + assertThat(resultsObject.get("is_training"), is(destDoc.containsKey(DEPENDENT_VARIABLE_FIELD))); + @SuppressWarnings("unchecked") + List> importanceArray = (List>)resultsObject.get("feature_importance"); + assertThat(importanceArray, hasSize(1)); + assertThat(importanceArray.get(0), hasEntry("feature_name", numericRuntimeField)); + } + + assertProgressComplete(jobId); + assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L)); + assertModelStatePersisted(stateDocId()); + assertExactlyOneInferenceModelPersisted(jobId); + assertMlResultsFieldMappings(destIndex, predictedClassField, "double"); + assertThatAuditMessagesMatch(jobId, + "Created analytics with analysis type [regression]", + "Estimated memory usage for this analytics to be", + "Starting analytics on node", + "Started analytics", + "Creating destination index [" + destIndex + "]", + "Started reindexing to destination index [" + destIndex + "]", + "Finished reindexing to destination index [" + destIndex + "]", + "Started loading data", + "Started analyzing", + "Started writing results", + "Finished analysis"); + } + private void initialize(String jobId) { this.jobId = jobId; this.sourceIndex = jobId + "_source_index"; diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java index f2d4d2ebe526d..8ba7fcb6c34b1 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java @@ -10,8 +10,6 @@ import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest; -import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchResponse; @@ -29,15 +27,15 @@ import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; import org.elasticsearch.xpack.core.ml.dataframe.analyses.OutlierDetection; -import org.elasticsearch.xpack.core.ml.utils.PhaseProgress; import org.junit.After; import org.junit.Before; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import static org.hamcrest.Matchers.allOf; -import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.emptyString; import static org.hamcrest.Matchers.equalTo; @@ -391,7 +389,7 @@ public void testOutlierDetectionWithMultipleSourceIndices() throws Exception { String id = "test_outlier_detection_with_multiple_source_indices"; DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() .setId(id) - .setSource(new DataFrameAnalyticsSource(sourceIndex, null, null)) + .setSource(new DataFrameAnalyticsSource(sourceIndex, null, null, null)) .setDest(new DataFrameAnalyticsDest(destIndex, null)) .setAnalysis(new OutlierDetection.Builder().build()) .build(); @@ -513,7 +511,7 @@ public void testModelMemoryLimitLowerThanEstimatedMemoryUsage() throws Exception ByteSizeValue modelMemoryLimit = ByteSizeValue.ofMb(1); DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() .setId(id) - .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null)) + .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null, null)) .setDest(new DataFrameAnalyticsDest(sourceIndex + "-results", null)) .setAnalysis(new OutlierDetection.Builder().build()) .setModelMemoryLimit(modelMemoryLimit) @@ -550,7 +548,7 @@ public void testLazyAssignmentWithModelMemoryLimitTooHighForAssignment() throws ByteSizeValue modelMemoryLimit = ByteSizeValue.ofTb(1); DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() .setId(id) - .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null)) + .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null, null)) .setDest(new DataFrameAnalyticsDest(sourceIndex + "-results", null)) .setAnalysis(new OutlierDetection.Builder().build()) .setModelMemoryLimit(modelMemoryLimit) @@ -834,6 +832,112 @@ public void testOutlierDetection_GivenIndexWithRuntimeFields() throws Exception "Finished analysis"); } + public void testOutlierDetection_GivenSearchRuntimeMappings() throws Exception { + String sourceIndex = "test-outlier-detection-index-with-search-runtime-fields"; + + String mappings = "{\"enabled\": false}"; + + client().admin().indices().prepareCreate(sourceIndex) + .setMapping(mappings) + .get(); + + BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); + bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + + for (int i = 0; i < 5; i++) { + IndexRequest indexRequest = new IndexRequest(sourceIndex); + + // We insert one odd value out of 5 for one feature + String docId = i == 0 ? "outlier" : "normal" + i; + indexRequest.id(docId); + indexRequest.source("numeric", i == 0 ? 100.0 : 1.0); + bulkRequestBuilder.add(indexRequest); + } + BulkResponse bulkResponse = bulkRequestBuilder.get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + } + + String id = "test_outlier_detection_index_with_search_runtime_fields"; + + Map runtimeMappings = new HashMap<>(); + Map numericFieldRuntimeMapping = new HashMap<>(); + numericFieldRuntimeMapping.put("type", "double"); + numericFieldRuntimeMapping.put("script", "emit(params._source.numeric)"); + runtimeMappings.put("runtime_numeric", numericFieldRuntimeMapping); + + DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() + .setId(id) + .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null, runtimeMappings)) + .setDest(new DataFrameAnalyticsDest(sourceIndex + "-results", null)) + .setAnalysis(new OutlierDetection.Builder().build()) + .build(); + putAnalytics(config); + + assertIsStopped(id); + assertProgressIsZero(id); + + startAnalytics(id); + waitUntilAnalyticsIsStopped(id); + GetDataFrameAnalyticsStatsAction.Response.Stats stats = getAnalyticsStats(id); + assertThat(stats.getDataCounts().getJobId(), equalTo(id)); + assertThat(stats.getDataCounts().getTrainingDocsCount(), equalTo(5L)); + assertThat(stats.getDataCounts().getTestDocsCount(), equalTo(0L)); + assertThat(stats.getDataCounts().getSkippedDocsCount(), equalTo(0L)); + + SearchResponse sourceData = client().prepareSearch(sourceIndex).get(); + double scoreOfOutlier = 0.0; + double scoreOfNonOutlier = -1.0; + for (SearchHit hit : sourceData.getHits()) { + GetResponse destDocGetResponse = client().prepareGet().setIndex(config.getDest().getIndex()).setId(hit.getId()).get(); + assertThat(destDocGetResponse.isExists(), is(true)); + Map sourceDoc = hit.getSourceAsMap(); + Map destDoc = destDocGetResponse.getSource(); + for (String field : sourceDoc.keySet()) { + assertThat(destDoc.containsKey(field), is(true)); + assertThat(destDoc.get(field), equalTo(sourceDoc.get(field))); + } + assertThat(destDoc.containsKey("ml"), is(true)); + + @SuppressWarnings("unchecked") + Map resultsObject = (Map) destDoc.get("ml"); + + assertThat(resultsObject.containsKey("outlier_score"), is(true)); + double outlierScore = (double) resultsObject.get("outlier_score"); + assertThat(outlierScore, allOf(greaterThanOrEqualTo(0.0), lessThanOrEqualTo(1.0))); + if (hit.getId().equals("outlier")) { + scoreOfOutlier = outlierScore; + + @SuppressWarnings("unchecked") + List> featureInfluence = (List>) resultsObject.get("feature_influence"); + assertThat(featureInfluence.size(), equalTo(1)); + assertThat(featureInfluence.get(0).get("feature_name"), equalTo("runtime_numeric")); + } else { + if (scoreOfNonOutlier < 0) { + scoreOfNonOutlier = outlierScore; + } else { + assertThat(outlierScore, equalTo(scoreOfNonOutlier)); + } + } + } + assertThat(scoreOfOutlier, is(greaterThan(scoreOfNonOutlier))); + + assertProgressComplete(id); + assertThat(searchStoredProgress(id).getHits().getTotalHits().value, equalTo(1L)); + assertThatAuditMessagesMatch(id, + "Created analytics with analysis type [outlier_detection]", + "Estimated memory usage for this analytics to be", + "Starting analytics on node", + "Started analytics", + "Creating destination index [" + sourceIndex + "-results]", + "Started reindexing to destination index [" + sourceIndex + "-results]", + "Finished reindexing to destination index [" + sourceIndex + "-results]", + "Started loading data", + "Started analyzing", + "Started writing results", + "Finished analysis"); + } + @Override boolean supportsInference() { return false; diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/ChunkedTrainedModelPersisterIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/ChunkedTrainedModelPersisterIT.java index cb68f15e750bc..e9876ac04ddfd 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/ChunkedTrainedModelPersisterIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/ChunkedTrainedModelPersisterIT.java @@ -71,7 +71,7 @@ public void testStoreModelViaChunkedPersister() throws IOException { String modelId = "stored-chunked-model"; DataFrameAnalyticsConfig analyticsConfig = new DataFrameAnalyticsConfig.Builder() .setId(modelId) - .setSource(new DataFrameAnalyticsSource(new String[] {"my_source"}, null, null)) + .setSource(new DataFrameAnalyticsSource(new String[] {"my_source"}, null, null, null)) .setDest(new DataFrameAnalyticsDest("my_dest", null)) .setAnalysis(new Regression("foo")) .build(); diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/UnusedStatsRemoverIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/UnusedStatsRemoverIT.java index 95034146eba49..0b2bbaf5eaf03 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/UnusedStatsRemoverIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/UnusedStatsRemoverIT.java @@ -64,7 +64,7 @@ public void testRemoveUnusedStats() throws Exception { PutDataFrameAnalyticsAction.Request request = new PutDataFrameAnalyticsAction.Request(new DataFrameAnalyticsConfig.Builder() .setId("analytics-with-stats") .setModelMemoryLimit(ByteSizeValue.ofGb(1)) - .setSource(new DataFrameAnalyticsSource(new String[]{"foo"}, null, null)) + .setSource(new DataFrameAnalyticsSource(new String[]{"foo"}, null, null, null)) .setDest(new DataFrameAnalyticsDest("bar", null)) .setAnalysis(new Regression("prediction")) .build()); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DestinationIndex.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DestinationIndex.java index e66b1a4ce6a13..5cb2a709924bb 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DestinationIndex.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DestinationIndex.java @@ -72,6 +72,7 @@ public final class DestinationIndex { private static final String PROPERTIES = "properties"; private static final String META = "_meta"; + private static final String RUNTIME = "runtime"; private static final String DFA_CREATOR = "data-frame-analytics"; @@ -124,23 +125,7 @@ private static void prepareCreateIndexRequest(Client client, Clock clock, DataFr ActionListener mappingsListener = ActionListener.wrap( mappings -> { mappingsHolder.set(mappings); - - List requiredFields = config.getAnalysis().getRequiredFields(); - if (requiredFields.isEmpty()) { - fieldCapabilitiesListener.onResponse(null); - return; - } - FieldCapabilitiesRequest fieldCapabilitiesRequest = - new FieldCapabilitiesRequest() - .indices(config.getSource().getIndex()) - .fields(requiredFields.stream().map(RequiredField::getName).toArray(String[]::new)); - ClientHelper.executeWithHeadersAsync( - config.getHeaders(), - ML_ORIGIN, - client, - FieldCapabilitiesAction.INSTANCE, - fieldCapabilitiesRequest, - fieldCapabilitiesListener); + getFieldCapsForRequiredFields(client, config, fieldCapabilitiesListener); }, listener::onFailure ); @@ -167,6 +152,27 @@ private static void prepareCreateIndexRequest(Client client, Clock clock, DataFr config.getHeaders(), ML_ORIGIN, client, GetSettingsAction.INSTANCE, getSettingsRequest, getSettingsResponseListener); } + private static void getFieldCapsForRequiredFields(Client client, DataFrameAnalyticsConfig config, + ActionListener listener) { + List requiredFields = config.getAnalysis().getRequiredFields(); + if (requiredFields.isEmpty()) { + listener.onResponse(null); + return; + } + FieldCapabilitiesRequest fieldCapabilitiesRequest = + new FieldCapabilitiesRequest() + .indices(config.getSource().getIndex()) + .fields(requiredFields.stream().map(RequiredField::getName).toArray(String[]::new)) + .runtimeFields(config.getSource().getRuntimeMappings()); + ClientHelper.executeWithHeadersAsync( + config.getHeaders(), + ML_ORIGIN, + client, + FieldCapabilitiesAction.INSTANCE, + fieldCapabilitiesRequest, + listener); + } + private static CreateIndexRequest createIndexRequest(Clock clock, DataFrameAnalyticsConfig config, Settings settings, @@ -179,6 +185,8 @@ private static CreateIndexRequest createIndexRequest(Clock clock, properties.putAll(createAdditionalMappings(config, fieldCapabilitiesResponse)); Map metadata = getOrPutDefault(mappingsAsMap, META, HashMap::new); metadata.putAll(createMetadata(config.getId(), clock, Version.CURRENT)); + Map runtimeMappings = getOrPutDefault(mappingsAsMap, RUNTIME, HashMap::new); + runtimeMappings.putAll(config.getSource().getRuntimeMappings()); return new CreateIndexRequest(destinationIndex, settings).mapping(mappingsAsMap); } @@ -258,8 +266,12 @@ public static void updateMappingsToDestIndex(Client client, ActionListener fieldCapabilitiesListener = ActionListener.wrap( fieldCapabilitiesResponse -> { + Map addedMappings = new HashMap<>(); + // Determine mappings to be added to the destination index - Map addedMappings = Map.of(PROPERTIES, createAdditionalMappings(config, fieldCapabilitiesResponse)); + addedMappings.put(PROPERTIES, createAdditionalMappings(config, fieldCapabilitiesResponse)); + // Also add runtime mappings + addedMappings.put(RUNTIME, config.getSource().getRuntimeMappings()); // Add the mappings to the destination index PutMappingRequest putMappingRequest = @@ -271,22 +283,7 @@ public static void updateMappingsToDestIndex(Client client, listener::onFailure ); - List requiredFields = config.getAnalysis().getRequiredFields(); - if (requiredFields.isEmpty()) { - fieldCapabilitiesListener.onResponse(null); - return; - } - FieldCapabilitiesRequest fieldCapabilitiesRequest = - new FieldCapabilitiesRequest() - .indices(config.getSource().getIndex()) - .fields(requiredFields.stream().map(RequiredField::getName).toArray(String[]::new)); - ClientHelper.executeWithHeadersAsync( - config.getHeaders(), - ML_ORIGIN, - client, - FieldCapabilitiesAction.INSTANCE, - fieldCapabilitiesRequest, - fieldCapabilitiesListener); + getFieldCapsForRequiredFields(client, config, fieldCapabilitiesListener); } private static void checkResultsFieldIsNotPresentInProperties(DataFrameAnalyticsConfig config, Map properties) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java index 194d37f5181d0..998b37de15f42 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java @@ -176,6 +176,8 @@ private SearchRequestBuilder buildSearchRequest() { searchRequestBuilder.addDocValueField(docValueField.getSearchField(), docValueField.getDocValueFormat()); } + searchRequestBuilder.setRuntimeMappings(context.runtimeMappings); + return searchRequestBuilder; } @@ -341,7 +343,8 @@ private SearchRequestBuilder buildDataSummarySearchRequestBuilder() { .setIndices(context.indices) .setSize(0) .setQuery(summaryQuery) - .setTrackTotalHits(true); + .setTrackTotalHits(true) + .setRuntimeMappings(context.runtimeMappings); } private QueryBuilder allExtractedFieldsExistQuery() { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorContext.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorContext.java index 496c87d7d9c1d..57fe032c107f4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorContext.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorContext.java @@ -26,9 +26,14 @@ public class DataFrameDataExtractorContext { final boolean supportsRowsWithMissingValues; final TrainTestSplitterFactory trainTestSplitterFactory; + // Runtime mappings are necessary while we are still querying the source indices. + // They should be empty when we're querying the destination index as the runtime + // fields should be mapped in the index. + final Map runtimeMappings; + DataFrameDataExtractorContext(String jobId, ExtractedFields extractedFields, List indices, QueryBuilder query, int scrollSize, Map headers, boolean includeSource, boolean supportsRowsWithMissingValues, - TrainTestSplitterFactory trainTestSplitterFactory) { + TrainTestSplitterFactory trainTestSplitterFactory, Map runtimeMappings) { this.jobId = Objects.requireNonNull(jobId); this.extractedFields = Objects.requireNonNull(extractedFields); this.indices = indices.toArray(new String[indices.size()]); @@ -38,5 +43,6 @@ public class DataFrameDataExtractorContext { this.includeSource = includeSource; this.supportsRowsWithMissingValues = supportsRowsWithMissingValues; this.trainTestSplitterFactory = Objects.requireNonNull(trainTestSplitterFactory); + this.runtimeMappings = Objects.requireNonNull(runtimeMappings); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java index 2213d2d607071..9786e04655e3f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java @@ -35,11 +35,12 @@ public class DataFrameDataExtractorFactory { private final Map headers; private final boolean supportsRowsWithMissingValues; private final TrainTestSplitterFactory trainTestSplitterFactory; + private final Map runtimeMappings; private DataFrameDataExtractorFactory(Client client, String analyticsId, List indices, QueryBuilder sourceQuery, ExtractedFields extractedFields, List requiredFields, Map headers, - boolean supportsRowsWithMissingValues, - TrainTestSplitterFactory trainTestSplitterFactory) { + boolean supportsRowsWithMissingValues, TrainTestSplitterFactory trainTestSplitterFactory, + Map runtimeMappings) { this.client = Objects.requireNonNull(client); this.analyticsId = Objects.requireNonNull(analyticsId); this.indices = Objects.requireNonNull(indices); @@ -49,6 +50,7 @@ private DataFrameDataExtractorFactory(Client client, String analyticsId, List fieldCapsPerType = fieldCapabilitiesResponse.getField(constraint.getField()); if (fieldCapsPerType == null) { @@ -171,6 +174,7 @@ private void getFieldCaps(String[] index, DataFrameAnalyticsConfig config, Actio fieldCapabilitiesRequest.indices(index); fieldCapabilitiesRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); fieldCapabilitiesRequest.fields("*"); + fieldCapabilitiesRequest.runtimeFields(config.getSource().getRuntimeMappings()); LOGGER.debug(() -> new ParameterizedMessage( "[{}] Requesting field caps for index {}", config.getId(), Arrays.toString(index))); ClientHelper.executeWithHeaders(config.getHeaders(), ML_ORIGIN, client, () -> { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DestinationIndexTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DestinationIndexTests.java index 98549256a97a8..4e284306840be 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DestinationIndexTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DestinationIndexTests.java @@ -463,7 +463,7 @@ private static Answer callListenerOnResponse(Response respo private static DataFrameAnalyticsConfig createConfig(DataFrameAnalysis analysis) { return new DataFrameAnalyticsConfig.Builder() .setId(ANALYTICS_ID) - .setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null, null)) + .setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null, null, null)) .setDest(new DataFrameAnalyticsDest(DEST_INDEX, null)) .setAnalysis(analysis) .build(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/MappingsMergerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/MappingsMergerTests.java index d3450e8dee0ff..5fec5cce2bbe0 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/MappingsMergerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/MappingsMergerTests.java @@ -251,11 +251,11 @@ public void testMergeMappings_GivenSourceFiltering() { } private static DataFrameAnalyticsSource newSource() { - return new DataFrameAnalyticsSource(new String[] {"index"}, null, null); + return new DataFrameAnalyticsSource(new String[] {"index"}, null, null, null); } private static DataFrameAnalyticsSource newSourceWithExcludes(String... excludes) { return new DataFrameAnalyticsSource(new String[] {"index"}, null, - new FetchSourceContext(true, null, excludes)); + new FetchSourceContext(true, null, excludes), null); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorTests.java index fdba987d715c5..da86a89db7ed8 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorTests.java @@ -565,7 +565,7 @@ public void testExtractionWithProcessedFieldThrows() { private TestExtractor createExtractor(boolean includeSource, boolean supportsRowsWithMissingValues) { DataFrameDataExtractorContext context = new DataFrameDataExtractorContext(JOB_ID, extractedFields, indices, query, scrollSize, - headers, includeSource, supportsRowsWithMissingValues, trainTestSplitterFactory); + headers, includeSource, supportsRowsWithMissingValues, trainTestSplitterFactory, Collections.emptyMap()); return new TestExtractor(client, context); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorTests.java index 11d7acb386443..a8a52673f1627 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorTests.java @@ -1137,7 +1137,7 @@ public void testDetect_withFeatureProcessors() { private DataFrameAnalyticsConfig buildOutlierDetectionConfig() { return new DataFrameAnalyticsConfig.Builder() .setId("foo") - .setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null, sourceFiltering)) + .setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null, sourceFiltering, null)) .setDest(new DataFrameAnalyticsDest(DEST_INDEX, RESULTS_FIELD)) .setAnalyzedFields(analyzedFields) .setAnalysis(new OutlierDetection.Builder().build()) @@ -1151,7 +1151,7 @@ private DataFrameAnalyticsConfig buildRegressionConfig(String dependentVariable) private DataFrameAnalyticsConfig buildClassificationConfig(String dependentVariable) { return new DataFrameAnalyticsConfig.Builder() .setId("foo") - .setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null, sourceFiltering)) + .setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null, sourceFiltering, null)) .setDest(new DataFrameAnalyticsDest(DEST_INDEX, RESULTS_FIELD)) .setAnalysis(new Classification(dependentVariable)) .build(); @@ -1160,7 +1160,7 @@ private DataFrameAnalyticsConfig buildClassificationConfig(String dependentVaria private DataFrameAnalyticsConfig buildRegressionConfig(String dependentVariable, List featureprocessors) { return new DataFrameAnalyticsConfig.Builder() .setId("foo") - .setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null, sourceFiltering)) + .setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null, sourceFiltering, null)) .setDest(new DataFrameAnalyticsDest(DEST_INDEX, RESULTS_FIELD)) .setAnalyzedFields(analyzedFields) .setAnalysis(new Regression(dependentVariable, diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/inference/InferenceRunnerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/inference/InferenceRunnerTests.java index 3e0e1bf931488..deb7b6a667254 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/inference/InferenceRunnerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/inference/InferenceRunnerTests.java @@ -71,7 +71,7 @@ public void setupTests() { config = new DataFrameAnalyticsConfig.Builder() .setId("test") .setAnalysis(RegressionTests.createRandom()) - .setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null)) + .setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null, null)) .setDest(new DataFrameAnalyticsDest("dest_index", "test_results_field")) .build(); progressTracker = ProgressTracker.fromZeroes(config.getAnalysis().getProgressPhases(), config.getAnalysis().supportsInference()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessorTests.java index 16a89c2192e4d..9d87ed78ed570 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessorTests.java @@ -82,7 +82,7 @@ public void setUpMocks() { analyticsConfig = new DataFrameAnalyticsConfig.Builder() .setId(JOB_ID) .setDescription(JOB_DESCRIPTION) - .setSource(new DataFrameAnalyticsSource(new String[] {"my_source"}, null, null)) + .setSource(new DataFrameAnalyticsSource(new String[] {"my_source"}, null, null, null)) .setDest(new DataFrameAnalyticsDest("my_dest", null)) .setAnalysis(new Regression("foo")) .build(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/ChunkedTrainedModelPersisterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/ChunkedTrainedModelPersisterTests.java index dd190dc062728..4c87429026df9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/ChunkedTrainedModelPersisterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/ChunkedTrainedModelPersisterTests.java @@ -77,7 +77,7 @@ public void testPersistAllDocs() { DataFrameAnalyticsConfig analyticsConfig = new DataFrameAnalyticsConfig.Builder() .setId(JOB_ID) .setDescription(JOB_DESCRIPTION) - .setSource(new DataFrameAnalyticsSource(new String[] {"my_source"}, null, null)) + .setSource(new DataFrameAnalyticsSource(new String[] {"my_source"}, null, null, null)) .setDest(new DataFrameAnalyticsDest("my_dest", null)) .setAnalysis(randomBoolean() ? new Regression("foo") : new Classification("foo")) .build(); diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml index 51bd2da2c841c..da0df533886fa 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml @@ -2100,3 +2100,39 @@ setup: }} - is_false: data_frame_analytics.0.create_time - is_false: data_frame_analytics.0.version + +--- +"Test put with runtime mappings": + + - do: + ml.put_data_frame_analytics: + id: "with-runtime-mappings" + body: > + { + "source": { + "index": "index-source", + "runtime_mappings": { + "runtime_field": { + "type": "double", + "script": "" + } + } + }, + "dest": { + "index": "index-dest" + }, + "analysis": { + "outlier_detection": { + } + } + } + - match: { id: "with-runtime-mappings" } + - match: { source.index: ["index-source"] } + - match: { source.runtime_mappings: { + "runtime_field": { + "type": "double", + "script": "" + } + } + } + - match: { dest.index: "index-dest" }