Skip to content

Commit

Permalink
[ML] Add runtime mappings to data frame analytics source config (#69183)
Browse files Browse the repository at this point in the history
Users can now specify runtime mappings as part of the source config
of a data frame analytics job. Those runtime mappings become part of
the mapping of the destination index. This ensures the fields are
accessible in the destination index even if the relevant data frame
analytics job gets deleted.

Closes #65056
  • Loading branch information
dimitris-athanasiou authored Feb 19, 2021
1 parent 623f547 commit 7fb98c0
Show file tree
Hide file tree
Showing 35 changed files with 665 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class DataFrameAnalyticsSource implements ToXContentObject {
Expand All @@ -45,16 +47,20 @@ public static Builder builder() {
(p, c) -> FetchSourceContext.fromXContent(p),
_SOURCE,
ObjectParser.ValueType.OBJECT_ARRAY_BOOLEAN_OR_STRING);
PARSER.declareObject(Builder::setRuntimeMappings, (p, c) -> p.map(), SearchSourceBuilder.RUNTIME_MAPPINGS_FIELD);
}

private final String[] index;
private final QueryConfig queryConfig;
private final FetchSourceContext sourceFiltering;
private final Map<String, Object> runtimeMappings;

private DataFrameAnalyticsSource(String[] index, @Nullable QueryConfig queryConfig, @Nullable FetchSourceContext sourceFiltering) {
private DataFrameAnalyticsSource(String[] index, @Nullable QueryConfig queryConfig, @Nullable FetchSourceContext sourceFiltering,
@Nullable Map<String, Object> runtimeMappings) {
this.index = Objects.requireNonNull(index);
this.queryConfig = queryConfig;
this.sourceFiltering = sourceFiltering;
this.runtimeMappings = runtimeMappings;
}

public String[] getIndex() {
Expand All @@ -69,6 +75,10 @@ public FetchSourceContext getSourceFiltering() {
return sourceFiltering;
}

public Map<String, Object> getRuntimeMappings() {
return runtimeMappings;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand All @@ -79,6 +89,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (sourceFiltering != null) {
builder.field(_SOURCE.getPreferredName(), sourceFiltering);
}
if (runtimeMappings != null) {
builder.field(SearchSourceBuilder.RUNTIME_MAPPINGS_FIELD.getPreferredName(), runtimeMappings);
}
builder.endObject();
return builder;
}
Expand All @@ -91,12 +104,13 @@ public boolean equals(Object o) {
DataFrameAnalyticsSource other = (DataFrameAnalyticsSource) o;
return Arrays.equals(index, other.index)
&& Objects.equals(queryConfig, other.queryConfig)
&& Objects.equals(sourceFiltering, other.sourceFiltering);
&& Objects.equals(sourceFiltering, other.sourceFiltering)
&& Objects.equals(runtimeMappings, other.runtimeMappings);
}

@Override
public int hashCode() {
return Objects.hash(Arrays.asList(index), queryConfig, sourceFiltering);
return Objects.hash(Arrays.asList(index), queryConfig, sourceFiltering, runtimeMappings);
}

@Override
Expand All @@ -109,6 +123,7 @@ public static class Builder {
private String[] index;
private QueryConfig queryConfig;
private FetchSourceContext sourceFiltering;
private Map<String, Object> runtimeMappings;

private Builder() {}

Expand All @@ -132,8 +147,13 @@ public Builder setSourceFiltering(FetchSourceContext sourceFiltering) {
return this;
}

public Builder setRuntimeMappings(Map<String, Object> runtimeMappings) {
this.runtimeMappings = runtimeMappings;
return this;
}

public DataFrameAnalyticsSource build() {
return new DataFrameAnalyticsSource(index, queryConfig, sourceFiltering);
return new DataFrameAnalyticsSource(index, queryConfig, sourceFiltering, runtimeMappings);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2995,13 +2995,16 @@ public void testPutDataFrameAnalytics() throws Exception {
QueryConfig queryConfig = new QueryConfig(new MatchAllQueryBuilder());
// end::put-data-frame-analytics-query-config

Map<String, Object> runtimeMappings = Collections.emptyMap();

// tag::put-data-frame-analytics-source-config
DataFrameAnalyticsSource sourceConfig = DataFrameAnalyticsSource.builder() // <1>
.setIndex("put-test-source-index") // <2>
.setQueryConfig(queryConfig) // <3>
.setRuntimeMappings(runtimeMappings) // <4>
.setSourceFiltering(new FetchSourceContext(true,
new String[] { "included_field_1", "included_field_2" },
new String[] { "excluded_field" })) // <4>
new String[] { "excluded_field" })) // <5>
.build();
// end::put-data-frame-analytics-source-config

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.elasticsearch.test.AbstractXContentTestCase;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Predicate;

import static java.util.Collections.emptyList;
Expand All @@ -31,11 +33,19 @@ public static DataFrameAnalyticsSource randomSourceConfig() {
generateRandomStringArray(10, 10, false, false),
generateRandomStringArray(10, 10, false, false));
}

Map<String, Object> runtimeMappings = null;
if (randomBoolean()) {
runtimeMappings = new HashMap<>();
Map<String, Object> runtimeField = new HashMap<>();
runtimeField.put("type", "keyword");
runtimeField.put("script", "");
runtimeMappings.put(randomAlphaOfLength(10), runtimeField);
}
return DataFrameAnalyticsSource.builder()
.setIndex(generateRandomStringArray(10, 10, false, false))
.setQueryConfig(randomBoolean() ? null : randomQueryConfig())
.setSourceFiltering(sourceFiltering)
.setRuntimeMappings(runtimeMappings)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ include-tagged::{doc-tests-file}[{api}-source-config]
<1> Constructing a new DataFrameAnalyticsSource
<2> The source index
<3> The query from which to gather the data. If query is not set, a `match_all` query is used by default.
<4> Source filtering to select which fields will exist in the destination index.
<4> Runtime mappings that will be added to the destination index mapping.
<5> Source filtering to select which fields will exist in the destination index.

===== QueryConfig

Expand Down
6 changes: 5 additions & 1 deletion docs/reference/ml/df-analytics/apis/put-dfanalytics.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ setting, an error occurs when you try to create {dfanalytics-jobs} that have
`source`::
(object)
The configuration of how to source the analysis data. It requires an `index`.
Optionally, `query` and `_source` may be specified.
Optionally, `query`, `runtime_mappings`, and `_source` may be specified.
+
.Properties of `source`
[%collapsible%open]
Expand All @@ -543,6 +543,10 @@ options that are supported by {es} can be used, as this object is passed
verbatim to {es}. By default, this property has the following value:
`{"match_all": {}}`.
`runtime_mappings`:::
(Optional, object) Definitions of runtime fields that will become part of the
mapping of the destination index.
`_source`:::
(Optional, object) Specify `includes` and/or `excludes` patterns to select which
fields will be present in the destination. Fields that are excluded cannot be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import java.util.Arrays;
import java.util.List;
import java.util.Map;

/**
* A search action request builder.
Expand Down Expand Up @@ -592,4 +593,12 @@ public SearchRequestBuilder setPreFilterShardSize(int preFilterShardSize) {
this.request.setPreFilterShardSize(preFilterShardSize);
return this;
}

/**
* Set runtime mappings to create runtime fields that exist only as part of this particular search.
*/
public SearchRequestBuilder setRuntimeMappings(Map<String, Object> runtimeMappings) {
sourceBuilder().runtimeMappings(runtimeMappings);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.MlStrings;
import org.elasticsearch.xpack.core.ml.utils.QueryProvider;
import org.elasticsearch.xpack.core.ml.utils.RuntimeMappingsValidator;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
import org.elasticsearch.xpack.core.ml.utils.XContentObjectTransformer;

Expand Down Expand Up @@ -825,7 +826,7 @@ public DatafeedConfig build() {
}

validateScriptFields();
validateRuntimeMappings();
RuntimeMappingsValidator.validate(runtimeMappings);
setDefaultChunkingConfig();

setDefaultQueryDelay();
Expand All @@ -846,28 +847,6 @@ void validateScriptFields() {
}
}

/**
* Perform a light check that the structure resembles runtime_mappings.
* The full check cannot happen until search
*/
void validateRuntimeMappings() {
for (Map.Entry<String, Object> entry : runtimeMappings.entrySet()) {
// top level objects are fields
String fieldName = entry.getKey();
if (entry.getValue() instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, Object> propNode = new HashMap<>(((Map<String, Object>) entry.getValue()));
Object typeNode = propNode.get("type");
if (typeNode == null) {
throw ExceptionsHelper.badRequestException("No type specified for runtime field [" + fieldName + "]");
}
} else {
throw ExceptionsHelper.badRequestException("Expected map for runtime field [" + fieldName + "] " +
"definition but got a " + fieldName.getClass().getSimpleName());
}
}
}

private static void checkNoMoreHistogramAggregations(Collection<AggregationBuilder> aggregations) {
for (AggregationBuilder agg : aggregations) {
if (ExtractorUtils.isHistogram(agg)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,19 @@
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.QueryProvider;
import org.elasticsearch.xpack.core.ml.utils.RuntimeMappingsValidator;
import org.elasticsearch.xpack.core.ml.utils.XContentObjectTransformer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -45,22 +49,27 @@ public static ConstructingObjectParser<DataFrameAnalyticsSource, Void> createPar
ignoreUnknownFields, a -> new DataFrameAnalyticsSource(
((List<String>) a[0]).toArray(new String[0]),
(QueryProvider) a[1],
(FetchSourceContext) a[2]));
(FetchSourceContext) a[2],
(Map<String, Object>) a[3]));
parser.declareStringArray(ConstructingObjectParser.constructorArg(), INDEX);
parser.declareObject(ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> QueryProvider.fromXContent(p, ignoreUnknownFields, Messages.DATA_FRAME_ANALYTICS_BAD_QUERY_FORMAT), QUERY);
parser.declareField(ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> FetchSourceContext.fromXContent(p),
_SOURCE,
ObjectParser.ValueType.OBJECT_ARRAY_BOOLEAN_OR_STRING);
parser.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.map(),
SearchSourceBuilder.RUNTIME_MAPPINGS_FIELD);
return parser;
}

private final String[] index;
private final QueryProvider queryProvider;
private final FetchSourceContext sourceFiltering;
private final Map<String, Object> runtimeMappings;

public DataFrameAnalyticsSource(String[] index, @Nullable QueryProvider queryProvider, @Nullable FetchSourceContext sourceFiltering) {
public DataFrameAnalyticsSource(String[] index, @Nullable QueryProvider queryProvider, @Nullable FetchSourceContext sourceFiltering,
@Nullable Map<String, Object> runtimeMappings) {
this.index = ExceptionsHelper.requireNonNull(index, INDEX);
if (index.length == 0) {
throw new IllegalArgumentException("source.index must specify at least one index");
Expand All @@ -73,6 +82,8 @@ public DataFrameAnalyticsSource(String[] index, @Nullable QueryProvider queryPro
throw new IllegalArgumentException("source._source cannot be disabled");
}
this.sourceFiltering = sourceFiltering;
this.runtimeMappings = runtimeMappings == null ? Collections.emptyMap() : Collections.unmodifiableMap(runtimeMappings);
RuntimeMappingsValidator.validate(this.runtimeMappings);
}

public DataFrameAnalyticsSource(StreamInput in) throws IOException {
Expand All @@ -83,13 +94,19 @@ public DataFrameAnalyticsSource(StreamInput in) throws IOException {
} else {
sourceFiltering = null;
}
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
runtimeMappings = in.readMap();
} else {
runtimeMappings = Collections.emptyMap();
}
}

public DataFrameAnalyticsSource(DataFrameAnalyticsSource other) {
this.index = Arrays.copyOf(other.index, other.index.length);
this.queryProvider = new QueryProvider(other.queryProvider);
this.sourceFiltering = other.sourceFiltering == null ? null : new FetchSourceContext(
other.sourceFiltering.fetchSource(), other.sourceFiltering.includes(), other.sourceFiltering.excludes());
this.runtimeMappings = Collections.unmodifiableMap(new HashMap<>(other.runtimeMappings));
}

@Override
Expand All @@ -99,6 +116,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_6_0)) {
out.writeOptionalWriteable(sourceFiltering);
}
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeMap(runtimeMappings);
}
}

@Override
Expand All @@ -109,6 +129,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (sourceFiltering != null) {
builder.field(_SOURCE.getPreferredName(), sourceFiltering);
}
if (runtimeMappings.isEmpty() == false) {
builder.field(SearchSourceBuilder.RUNTIME_MAPPINGS_FIELD.getPreferredName(), runtimeMappings);
}
builder.endObject();
return builder;
}
Expand All @@ -121,12 +144,13 @@ public boolean equals(Object o) {
DataFrameAnalyticsSource other = (DataFrameAnalyticsSource) o;
return Arrays.equals(index, other.index)
&& Objects.equals(queryProvider, other.queryProvider)
&& Objects.equals(sourceFiltering, other.sourceFiltering);
&& Objects.equals(sourceFiltering, other.sourceFiltering)
&& Objects.equals(runtimeMappings, other.runtimeMappings);
}

@Override
public int hashCode() {
return Objects.hash(Arrays.asList(index), queryProvider, sourceFiltering);
return Objects.hash(Arrays.asList(index), queryProvider, sourceFiltering, runtimeMappings);
}

public String[] getIndex() {
Expand Down Expand Up @@ -189,6 +213,10 @@ Map<String, Object> getQuery() {
return queryProvider.getQuery();
}

public Map<String, Object> getRuntimeMappings() {
return runtimeMappings;
}

public boolean isFieldExcluded(String path) {
if (sourceFiltering == null) {
return false;
Expand Down
Loading

0 comments on commit 7fb98c0

Please sign in to comment.