From 004388f0f0387a9a988d29459937f3b03e5c46db Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Wed, 29 Jul 2020 14:00:18 +0200 Subject: [PATCH] [Transform] add support for missing bucket (#59591) add support for "missing_bucket" in group_by fixes #42941 fixes #55102 --- .../pivot/DateHistogramGroupSource.java | 35 ++- .../transforms/pivot/GeoTileGroupSource.java | 72 ++++++- .../pivot/HistogramGroupSource.java | 31 ++- .../transforms/pivot/SingleGroupSource.java | 18 +- .../transforms/pivot/TermsGroupSource.java | 24 ++- .../pivot/DateHistogramGroupSourceTests.java | 8 +- .../pivot/GeoTileGroupSourceTests.java | 11 +- .../transforms/pivot/GroupConfigTests.java | 20 +- .../pivot/HistogramGroupSourceTests.java | 3 +- .../pivot/TermsGroupSourceTests.java | 2 +- .../hlrc/DateHistogramGroupSourceTests.java | 2 + .../pivot/hlrc/GeoTileGroupSourceTests.java | 12 +- .../pivot/hlrc/HistogramGroupSourceTests.java | 4 +- .../pivot/hlrc/TermsGroupSourceTests.java | 4 +- .../pivot/DateHistogramGroupSource.java | 21 +- .../transforms/pivot/GeoTileGroupSource.java | 17 +- .../pivot/HistogramGroupSource.java | 16 +- .../transforms/pivot/SingleGroupSource.java | 27 ++- .../transforms/pivot/TermsGroupSource.java | 7 +- .../UpdateTransformsActionResponseTests.java | 4 +- .../transforms/TransformConfigTests.java | 18 +- .../pivot/DateHistogramGroupSourceTests.java | 23 +- .../pivot/GeoTileGroupSourceTests.java | 17 +- .../transforms/pivot/GroupConfigTests.java | 13 +- .../pivot/HistogramGroupSourceTests.java | 13 +- .../transforms/pivot/PivotConfigTests.java | 13 +- .../pivot/TermsGroupSourceTests.java | 13 +- .../integration/TransformPivotRestIT.java | 84 ++++---- .../integration/TransformProgressIT.java | 54 +++-- .../integration/TransformRestTestCase.java | 173 ++++++++------- .../TransformTaskFailedStateIT.java | 4 +- .../xpack/transform/transforms/Function.java | 7 + .../transforms/TransformIndexer.java | 15 ++ .../CompositeBucketsChangeCollector.java | 202 ++++++++++++++++-- .../transform/transforms/pivot/Pivot.java | 14 +- .../CompositeBucketsChangeCollectorTests.java | 2 +- 36 files changed, 742 insertions(+), 261 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/pivot/DateHistogramGroupSource.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/pivot/DateHistogramGroupSource.java index 02afa282a2fc0..1fe7be0798a8a 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/pivot/DateHistogramGroupSource.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/pivot/DateHistogramGroupSource.java @@ -188,9 +188,10 @@ public int hashCode() { (args) -> { String field = (String) args[0]; Script script = (Script) args[1]; - String fixedInterval = (String) args[2]; - String calendarInterval = (String) args[3]; - ZoneId zoneId = (ZoneId) args[4]; + boolean missingBucket = args[2] == null ? false : (boolean) args[2]; + String fixedInterval = (String) args[3]; + String calendarInterval = (String) args[4]; + ZoneId zoneId = (ZoneId) args[5]; Interval interval = null; @@ -204,13 +205,14 @@ public int hashCode() { throw new IllegalArgumentException("You must specify either fixed_interval or calendar_interval, found none"); } - return new DateHistogramGroupSource(field, script, interval, zoneId); + return new DateHistogramGroupSource(field, script, missingBucket, interval, zoneId); } ); static { PARSER.declareString(optionalConstructorArg(), FIELD); Script.declareScript(PARSER, optionalConstructorArg(), SCRIPT); + PARSER.declareBoolean(optionalConstructorArg(), MISSING_BUCKET); PARSER.declareString(optionalConstructorArg(), new ParseField(FixedInterval.NAME)); PARSER.declareString(optionalConstructorArg(), new ParseField(CalendarInterval.NAME)); @@ -231,7 +233,11 @@ public static DateHistogramGroupSource fromXContent(final XContentParser parser) private final ZoneId timeZone; DateHistogramGroupSource(String field, Script script, Interval interval, ZoneId timeZone) { - super(field, script); + this(field, script, false, interval, timeZone); + } + + DateHistogramGroupSource(String field, Script script, boolean missingBucket, Interval interval, ZoneId timeZone) { + super(field, script, missingBucket); this.interval = interval; this.timeZone = timeZone; } @@ -273,14 +279,16 @@ public boolean equals(Object other) { final DateHistogramGroupSource that = (DateHistogramGroupSource) other; - return Objects.equals(this.field, that.field) + return this.missingBucket == that.missingBucket + && Objects.equals(this.field, that.field) + && Objects.equals(this.script, that.script) && Objects.equals(this.interval, that.interval) && Objects.equals(this.timeZone, that.timeZone); } @Override public int hashCode() { - return Objects.hash(field, interval, timeZone); + return Objects.hash(field, script, missingBucket, interval, timeZone); } @Override @@ -298,6 +306,7 @@ public static class Builder { private Script script; private Interval interval; private ZoneId timeZone; + private boolean missingBucket; /** * The field with which to construct the date histogram grouping @@ -339,8 +348,18 @@ public Builder setTimeZone(ZoneId timeZone) { return this; } + /** + * Sets the value of "missing_bucket" + * @param missingBucket value of "missing_bucket" to be set + * @return The {@link Builder} with "missing_bucket" set. + */ + public Builder setMissingBucket(boolean missingBucket) { + this.missingBucket = missingBucket; + return this; + } + public DateHistogramGroupSource build() { - return new DateHistogramGroupSource(field, script, interval, timeZone); + return new DateHistogramGroupSource(field, script, missingBucket, interval, timeZone); } } } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/pivot/GeoTileGroupSource.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/pivot/GeoTileGroupSource.java index 8c696bc968486..95ea8f25def10 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/pivot/GeoTileGroupSource.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/pivot/GeoTileGroupSource.java @@ -40,16 +40,18 @@ public class GeoTileGroupSource extends SingleGroupSource implements ToXContentO private static final String NAME = "transform_geo_tile_group"; private static final ParseField PRECISION = new ParseField("precision"); - private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, true, (args) -> { + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, true, (args) -> { String field = (String) args[0]; - Integer precision = (Integer) args[1]; - GeoBoundingBox boundingBox = (GeoBoundingBox) args[2]; + boolean missingBucket = args[1] == null ? false : (boolean) args[1]; + Integer precision = (Integer) args[2]; + GeoBoundingBox boundingBox = (GeoBoundingBox) args[3]; - return new GeoTileGroupSource(field, precision, boundingBox); + return new GeoTileGroupSource(field, missingBucket, precision, boundingBox); }); static { PARSER.declareString(optionalConstructorArg(), FIELD); + PARSER.declareBoolean(optionalConstructorArg(), MISSING_BUCKET); PARSER.declareInt(optionalConstructorArg(), PRECISION); PARSER.declareField( optionalConstructorArg(), @@ -62,7 +64,11 @@ public class GeoTileGroupSource extends SingleGroupSource implements ToXContentO private final GeoBoundingBox geoBoundingBox; public GeoTileGroupSource(final String field, final Integer precision, final GeoBoundingBox boundingBox) { - super(field, null); + this(field, false, precision, boundingBox); + } + + public GeoTileGroupSource(final String field, final boolean missingBucket, final Integer precision, final GeoBoundingBox boundingBox) { + super(field, null, missingBucket); if (precision != null) { GeoTileUtils.checkPrecisionRange(precision); } @@ -113,14 +119,66 @@ public boolean equals(Object other) { final GeoTileGroupSource that = (GeoTileGroupSource) other; - return Objects.equals(this.field, that.field) + return this.missingBucket == that.missingBucket + && Objects.equals(this.field, that.field) && Objects.equals(this.precision, that.precision) && Objects.equals(this.geoBoundingBox, that.geoBoundingBox); } @Override public int hashCode() { - return Objects.hash(field, precision, geoBoundingBox); + return Objects.hash(field, missingBucket, precision, geoBoundingBox); } + public static class Builder { + + private String field; + private boolean missingBucket; + private Integer precision; + private GeoBoundingBox boundingBox; + + /** + * The field with which to construct the geo tile grouping + * @param field The field name + * @return The {@link Builder} with the field set. + */ + public Builder setField(String field) { + this.field = field; + return this; + } + + /** + * Sets the value of "missing_bucket" + * @param missingBucket value of "missing_bucket" to be set + * @return The {@link Builder} with "missing_bucket" set. + */ + public Builder setMissingBucket(boolean missingBucket) { + this.missingBucket = missingBucket; + return this; + } + + /** + * The precision with which to construct the geo tile grouping + * @param precision The precision + * @return The {@link Builder} with the precision set. + */ + public Builder setPrecission(Integer precision) { + this.precision = precision; + return this; + } + + /** + * Set the bounding box for the geo tile grouping + * @param boundingBox The bounding box + * @return the {@link Builder} with the bounding box set. + */ + public Builder setBoundingBox(GeoBoundingBox boundingBox) { + this.boundingBox = boundingBox; + return this; + } + + public GeoTileGroupSource build() { + return new GeoTileGroupSource(field, missingBucket, precision, boundingBox); + } + } } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/pivot/HistogramGroupSource.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/pivot/HistogramGroupSource.java index c6d89c78c554e..28a3fbbd0c016 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/pivot/HistogramGroupSource.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/pivot/HistogramGroupSource.java @@ -41,12 +41,13 @@ public class HistogramGroupSource extends SingleGroupSource implements ToXConten private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "histogram_group_source", true, - args -> new HistogramGroupSource((String) args[0], (Script) args[1], (double) args[2]) + args -> new HistogramGroupSource((String) args[0], (Script) args[1], args[2] == null ? false : (boolean) args[2], (double) args[3]) ); static { PARSER.declareString(optionalConstructorArg(), FIELD); Script.declareScript(PARSER, optionalConstructorArg(), SCRIPT); + PARSER.declareBoolean(optionalConstructorArg(), MISSING_BUCKET); PARSER.declareDouble(optionalConstructorArg(), INTERVAL); } @@ -57,7 +58,11 @@ public static HistogramGroupSource fromXContent(final XContentParser parser) { private final double interval; HistogramGroupSource(String field, Script script, double interval) { - super(field, script); + this(field, script, false, interval); + } + + HistogramGroupSource(String field, Script script, boolean missingBucket, double interval) { + super(field, script, missingBucket); if (interval <= 0) { throw new IllegalArgumentException("[interval] must be greater than 0."); } @@ -94,12 +99,15 @@ public boolean equals(Object other) { final HistogramGroupSource that = (HistogramGroupSource) other; - return Objects.equals(this.field, that.field) && Objects.equals(this.interval, that.interval); + return this.missingBucket == that.missingBucket + && Objects.equals(this.field, that.field) + && Objects.equals(this.script, that.script) + && Objects.equals(this.interval, that.interval); } @Override public int hashCode() { - return Objects.hash(field, interval); + return Objects.hash(field, script, interval, missingBucket); } public static Builder builder() { @@ -110,6 +118,7 @@ public static class Builder { private String field; private Script script; + private boolean missingBucket; private double interval; /** @@ -123,7 +132,7 @@ public Builder setField(String field) { } /** - * Set the interval for the histogram aggregation + * Set the interval for the histogram grouping * @param interval The numeric interval for the histogram grouping * @return The {@link Builder} with the interval set. */ @@ -142,8 +151,18 @@ public Builder setScript(Script script) { return this; } + /** + * Sets the value of "missing_bucket" + * @param missingBucket value of "missing_bucket" to be set + * @return The {@link Builder} with "missing_bucket" set. + */ + public Builder setMissingBucket(boolean missingBucket) { + this.missingBucket = missingBucket; + return this; + } + public HistogramGroupSource build() { - return new HistogramGroupSource(field, script, interval); + return new HistogramGroupSource(field, script, missingBucket, interval); } } } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/pivot/SingleGroupSource.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/pivot/SingleGroupSource.java index ef5edcf328189..d0a9bb3ee4ff1 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/pivot/SingleGroupSource.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/pivot/SingleGroupSource.java @@ -32,6 +32,7 @@ public abstract class SingleGroupSource implements ToXContentObject { protected static final ParseField FIELD = new ParseField("field"); protected static final ParseField SCRIPT = new ParseField("script"); + protected static final ParseField MISSING_BUCKET = new ParseField("missing_bucket"); public enum Type { TERMS, @@ -46,10 +47,12 @@ public String value() { protected final String field; protected final Script script; + protected final boolean missingBucket; - public SingleGroupSource(final String field, final Script script) { + public SingleGroupSource(final String field, final Script script, final boolean missingBucket) { this.field = field; this.script = script; + this.missingBucket = missingBucket; } public abstract Type getType(); @@ -62,6 +65,10 @@ public Script getScript() { return script; } + public boolean getMissingBucket() { + return missingBucket; + } + protected void innerXContent(XContentBuilder builder, Params params) throws IOException { if (field != null) { builder.field(FIELD.getPreferredName(), field); @@ -69,6 +76,9 @@ protected void innerXContent(XContentBuilder builder, Params params) throws IOEx if (script != null) { builder.field(SCRIPT.getPreferredName(), script); } + if (missingBucket) { + builder.field(MISSING_BUCKET.getPreferredName(), missingBucket); + } } @Override @@ -83,11 +93,13 @@ public boolean equals(Object other) { final SingleGroupSource that = (SingleGroupSource) other; - return Objects.equals(this.field, that.field) && Objects.equals(this.script, that.script); + return this.missingBucket == that.missingBucket + && Objects.equals(this.field, that.field) + && Objects.equals(this.script, that.script); } @Override public int hashCode() { - return Objects.hash(field, script); + return Objects.hash(field, script, missingBucket); } } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/pivot/TermsGroupSource.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/pivot/TermsGroupSource.java index 948d109c2d84c..4a977466cfbfa 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/pivot/TermsGroupSource.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/pivot/TermsGroupSource.java @@ -35,12 +35,13 @@ public class TermsGroupSource extends SingleGroupSource implements ToXContentObj private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "terms_group_source", true, - args -> new TermsGroupSource((String) args[0], (Script) args[1]) + args -> new TermsGroupSource((String) args[0], (Script) args[1], args[2] == null ? false : (boolean) args[2]) ); static { PARSER.declareString(optionalConstructorArg(), FIELD); Script.declareScript(PARSER, optionalConstructorArg(), SCRIPT); + PARSER.declareBoolean(optionalConstructorArg(), MISSING_BUCKET); } public static TermsGroupSource fromXContent(final XContentParser parser) { @@ -48,7 +49,11 @@ public static TermsGroupSource fromXContent(final XContentParser parser) { } TermsGroupSource(final String field, final Script script) { - super(field, script); + this(field, script, false); + } + + TermsGroupSource(final String field, final Script script, final boolean missingBucket) { + super(field, script, missingBucket); } @Override @@ -72,9 +77,10 @@ public static class Builder { private String field; private Script script; + private boolean missingBucket; /** - * The field with which to construct the date histogram grouping + * The field with which to construct the terms grouping * @param field The field name * @return The {@link Builder} with the field set. */ @@ -93,8 +99,18 @@ public Builder setScript(Script script) { return this; } + /** + * Sets the value of "missing_bucket" + * @param missingBucket value of "missing_bucket" to be set + * @return The {@link Builder} with "missing_bucket" set. + */ + public Builder setMissingBucket(boolean missingBucket) { + this.missingBucket = missingBucket; + return this; + } + public TermsGroupSource build() { - return new TermsGroupSource(field, script); + return new TermsGroupSource(field, script, missingBucket); } } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/DateHistogramGroupSourceTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/DateHistogramGroupSourceTests.java index 0e29763d07831..2a4484574353f 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/DateHistogramGroupSourceTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/DateHistogramGroupSourceTests.java @@ -41,7 +41,13 @@ public static DateHistogramGroupSource randomDateHistogramGroupSource() { String field = randomAlphaOfLengthBetween(1, 20); Script script = randomBoolean() ? new Script(randomAlphaOfLengthBetween(1, 10)) : null; - return new DateHistogramGroupSource(field, script, randomDateHistogramInterval(), randomBoolean() ? randomZone() : null); + return new DateHistogramGroupSource( + field, + script, + randomBoolean(), + randomDateHistogramInterval(), + randomBoolean() ? randomZone() : null + ); } @Override diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/GeoTileGroupSourceTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/GeoTileGroupSourceTests.java index 97776f566110d..ee7d198a9a225 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/GeoTileGroupSourceTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/GeoTileGroupSourceTests.java @@ -36,11 +36,14 @@ public static GeoTileGroupSource randomGeoTileGroupSource() { Rectangle rectangle = GeometryTestUtils.randomRectangle(); return new GeoTileGroupSource( randomBoolean() ? null : randomAlphaOfLength(10), + randomBoolean(), randomBoolean() ? null : randomIntBetween(1, GeoTileUtils.MAX_ZOOM), - randomBoolean() ? null : new GeoBoundingBox( - new GeoPoint(rectangle.getMaxLat(), rectangle.getMinLon()), - new GeoPoint(rectangle.getMinLat(), rectangle.getMaxLon()) - ) + randomBoolean() + ? null + : new GeoBoundingBox( + new GeoPoint(rectangle.getMaxLat(), rectangle.getMinLon()), + new GeoPoint(rectangle.getMinLat(), rectangle.getMaxLon()) + ) ); } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/GroupConfigTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/GroupConfigTests.java index 50a60d573d1f6..39082d3a80b87 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/GroupConfigTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/GroupConfigTests.java @@ -45,7 +45,7 @@ public static GroupConfig randomGroupConfig() { for (int i = 0; i < randomIntBetween(1, 4); ++i) { String targetFieldName = randomAlphaOfLengthBetween(1, 20); if (names.add(targetFieldName)) { - SingleGroupSource groupBy; + SingleGroupSource groupBy = null; SingleGroupSource.Type type = randomFrom(SingleGroupSource.Type.values()); switch (type) { case TERMS: @@ -58,8 +58,10 @@ public static GroupConfig randomGroupConfig() { groupBy = DateHistogramGroupSourceTests.randomDateHistogramGroupSource(); break; case GEOTILE_GRID: - default: groupBy = GeoTileGroupSourceTests.randomGeoTileGroupSource(); + break; + default: + fail("unknown group source type, please implement tests and add support here"); } groups.put(targetFieldName, groupBy); } @@ -109,8 +111,11 @@ public void testLenientParsing() throws IOException { + " ]" + "}" ); - XContentParser parser = JsonXContent.jsonXContent - .createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, json.streamInput()); + XContentParser parser = JsonXContent.jsonXContent.createParser( + NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + json.streamInput() + ); GroupConfig gc = GroupConfig.fromXContent(parser); @@ -138,8 +143,11 @@ public void testLenientParsingUnknowGroupType() throws IOException { + " }" + "}" ); - XContentParser parser = JsonXContent.jsonXContent - .createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, json.streamInput()); + XContentParser parser = JsonXContent.jsonXContent.createParser( + NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + json.streamInput() + ); GroupConfig gc = GroupConfig.fromXContent(parser); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/HistogramGroupSourceTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/HistogramGroupSourceTests.java index 1ddf904fd95b4..3b7290cefb81b 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/HistogramGroupSourceTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/HistogramGroupSourceTests.java @@ -31,8 +31,9 @@ public class HistogramGroupSourceTests extends AbstractXContentTestCase createPa ConstructingObjectParser parser = new ConstructingObjectParser<>(NAME, lenient, (args) -> { String field = (String) args[0]; ScriptConfig scriptConfig = (ScriptConfig) args[1]; - String fixedInterval = (String) args[2]; - String calendarInterval = (String) args[3]; - ZoneId zoneId = (ZoneId) args[4]; + boolean missingBucket = args[2] == null ? false : (boolean) args[2]; + String fixedInterval = (String) args[3]; + String calendarInterval = (String) args[4]; + ZoneId zoneId = (ZoneId) args[5]; Interval interval = null; @@ -261,7 +262,7 @@ private static ConstructingObjectParser createPa throw new IllegalArgumentException("You must specify either fixed_interval or calendar_interval, found none"); } - return new DateHistogramGroupSource(field, scriptConfig, interval, zoneId); + return new DateHistogramGroupSource(field, scriptConfig, missingBucket, interval, zoneId); }); declareValuesSourceFields(parser, lenient); @@ -336,12 +337,16 @@ public boolean equals(Object other) { final DateHistogramGroupSource that = (DateHistogramGroupSource) other; - return Objects.equals(this.field, that.field) && Objects.equals(interval, that.interval) && Objects.equals(timeZone, that.timeZone); + return this.missingBucket == that.missingBucket + && Objects.equals(this.field, that.field) + && Objects.equals(this.scriptConfig, that.scriptConfig) + && Objects.equals(this.interval, that.interval) + && Objects.equals(this.timeZone, that.timeZone); } @Override public int hashCode() { - return Objects.hash(field, interval, timeZone); + return Objects.hash(field, scriptConfig, missingBucket, interval, timeZone); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/GeoTileGroupSource.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/GeoTileGroupSource.java index 32c988662253d..b03737fdba17b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/GeoTileGroupSource.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/GeoTileGroupSource.java @@ -46,12 +46,14 @@ public class GeoTileGroupSource extends SingleGroupSource { private static ConstructingObjectParser createParser(boolean lenient) { ConstructingObjectParser parser = new ConstructingObjectParser<>(NAME, lenient, (args) -> { String field = (String) args[0]; - Integer precision = (Integer) args[1]; - GeoBoundingBox boundingBox = (GeoBoundingBox) args[2]; + boolean missingBucket = args[1] == null ? false : (boolean) args[1]; + Integer precision = (Integer) args[2]; + GeoBoundingBox boundingBox = (GeoBoundingBox) args[3]; - return new GeoTileGroupSource(field, precision, boundingBox); + return new GeoTileGroupSource(field, missingBucket, precision, boundingBox); }); parser.declareString(optionalConstructorArg(), FIELD); + parser.declareBoolean(optionalConstructorArg(), MISSING_BUCKET); parser.declareInt(optionalConstructorArg(), PRECISION); parser.declareField( optionalConstructorArg(), @@ -65,8 +67,8 @@ private static ConstructingObjectParser createParser(b private final Integer precision; private final GeoBoundingBox geoBoundingBox; - public GeoTileGroupSource(final String field, final Integer precision, final GeoBoundingBox boundingBox) { - super(field, null); + public GeoTileGroupSource(final String field, final boolean missingBucket, final Integer precision, final GeoBoundingBox boundingBox) { + super(field, null, missingBucket); if (precision != null) { GeoTileUtils.checkPrecisionRange(precision); } @@ -135,14 +137,15 @@ public boolean equals(Object other) { final GeoTileGroupSource that = (GeoTileGroupSource) other; - return Objects.equals(this.field, that.field) + return this.missingBucket == that.missingBucket + && Objects.equals(this.field, that.field) && Objects.equals(this.precision, that.precision) && Objects.equals(this.geoBoundingBox, that.geoBoundingBox); } @Override public int hashCode() { - return Objects.hash(field, precision, geoBoundingBox); + return Objects.hash(field, missingBucket, precision, geoBoundingBox); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/HistogramGroupSource.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/HistogramGroupSource.java index 6cb65bea0756c..bd4e7c10fd652 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/HistogramGroupSource.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/HistogramGroupSource.java @@ -25,8 +25,8 @@ public class HistogramGroupSource extends SingleGroupSource { private static final ConstructingObjectParser LENIENT_PARSER = createParser(true); private final double interval; - public HistogramGroupSource(String field, ScriptConfig scriptConfig, double interval) { - super(field, scriptConfig); + public HistogramGroupSource(String field, ScriptConfig scriptConfig, boolean missingBucket, double interval) { + super(field, scriptConfig, missingBucket); if (interval <= 0) { throw new IllegalArgumentException("[interval] must be greater than 0."); } @@ -42,8 +42,9 @@ private static ConstructingObjectParser createParser ConstructingObjectParser parser = new ConstructingObjectParser<>(NAME, lenient, (args) -> { String field = (String) args[0]; ScriptConfig scriptConfig = (ScriptConfig) args[1]; - double interval = (double) args[2]; - return new HistogramGroupSource(field, scriptConfig, interval); + boolean missingBucket = args[2] == null ? false : (boolean) args[2]; + double interval = (double) args[3]; + return new HistogramGroupSource(field, scriptConfig, missingBucket, interval); }); declareValuesSourceFields(parser, lenient); parser.declareDouble(optionalConstructorArg(), INTERVAL); @@ -90,12 +91,15 @@ public boolean equals(Object other) { final HistogramGroupSource that = (HistogramGroupSource) other; - return Objects.equals(this.field, that.field) && Objects.equals(this.interval, that.interval); + return this.missingBucket == that.missingBucket + && Objects.equals(this.field, that.field) + && Objects.equals(this.scriptConfig, that.scriptConfig) + && Objects.equals(this.interval, that.interval); } @Override public int hashCode() { - return Objects.hash(field, interval); + return Objects.hash(field, scriptConfig, interval); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/SingleGroupSource.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/SingleGroupSource.java index c08b0e6c1d2b9..ea06cd551425c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/SingleGroupSource.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/SingleGroupSource.java @@ -66,18 +66,22 @@ public String value() { protected static final ParseField FIELD = new ParseField("field"); protected static final ParseField SCRIPT = new ParseField("script"); + protected static final ParseField MISSING_BUCKET = new ParseField("missing_bucket"); protected final String field; protected final ScriptConfig scriptConfig; + protected final boolean missingBucket; static void declareValuesSourceFields(AbstractObjectParser parser, boolean lenient) { parser.declareString(optionalConstructorArg(), FIELD); parser.declareObject(optionalConstructorArg(), (p, c) -> ScriptConfig.fromXContent(p, lenient), SCRIPT); + parser.declareBoolean(optionalConstructorArg(), MISSING_BUCKET); } - public SingleGroupSource(final String field, final ScriptConfig scriptConfig) { + public SingleGroupSource(final String field, final ScriptConfig scriptConfig, final boolean missingBucket) { this.field = field; this.scriptConfig = scriptConfig; + this.missingBucket = missingBucket; } public SingleGroupSource(StreamInput in) throws IOException { @@ -87,6 +91,11 @@ public SingleGroupSource(StreamInput in) throws IOException { } else { scriptConfig = null; } + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { // todo: V_7_10_0 + missingBucket = in.readBoolean(); + } else { + missingBucket = false; + } } @Override @@ -104,6 +113,9 @@ protected void innerXContent(XContentBuilder builder, Params params) throws IOEx if (scriptConfig != null) { builder.field(SCRIPT.getPreferredName(), scriptConfig); } + if (missingBucket) { + builder.field(MISSING_BUCKET.getPreferredName(), missingBucket); + } } @Override @@ -112,6 +124,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_7_7_0)) { out.writeOptionalWriteable(scriptConfig); } + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { // todo: V_7_10_0 + out.writeBoolean(missingBucket); + } } public abstract Type getType(); @@ -126,6 +141,10 @@ public ScriptConfig getScriptConfig() { return scriptConfig; } + public boolean getMissingBucket() { + return missingBucket; + } + @Override public boolean equals(Object other) { if (this == other) { @@ -138,12 +157,14 @@ public boolean equals(Object other) { final SingleGroupSource that = (SingleGroupSource) other; - return Objects.equals(this.field, that.field) && Objects.equals(this.scriptConfig, that.scriptConfig); + return this.missingBucket == that.missingBucket + && Objects.equals(this.field, that.field) + && Objects.equals(this.scriptConfig, that.scriptConfig); } @Override public int hashCode() { - return Objects.hash(field, scriptConfig); + return Objects.hash(field, scriptConfig, missingBucket); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/TermsGroupSource.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/TermsGroupSource.java index a5657556d1bbf..76a4ae1c4ba4d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/TermsGroupSource.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/TermsGroupSource.java @@ -25,16 +25,17 @@ private static ConstructingObjectParser createParser(boo ConstructingObjectParser parser = new ConstructingObjectParser<>(NAME, lenient, (args) -> { String field = (String) args[0]; ScriptConfig scriptConfig = (ScriptConfig) args[1]; + boolean missingBucket = args[2] == null ? false : (boolean) args[2]; - return new TermsGroupSource(field, scriptConfig); + return new TermsGroupSource(field, scriptConfig, missingBucket); }); SingleGroupSource.declareValuesSourceFields(parser, lenient); return parser; } - public TermsGroupSource(final String field, final ScriptConfig scriptConfig) { - super(field, scriptConfig); + public TermsGroupSource(final String field, final ScriptConfig scriptConfig, boolean missingBucket) { + super(field, scriptConfig, missingBucket); } public TermsGroupSource(StreamInput in) throws IOException { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/UpdateTransformsActionResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/UpdateTransformsActionResponseTests.java index 302c91e2b80e0..d8ee90eb8adc7 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/UpdateTransformsActionResponseTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/UpdateTransformsActionResponseTests.java @@ -34,7 +34,9 @@ protected Response doParseInstance(XContentParser parser) throws IOException { } public void testBWCPre78() throws IOException { - Response newResponse = createTestInstance(); + Response newResponse = new Response( + TransformConfigTests.randomTransformConfigWithoutHeaders(Version.V_7_8_0, randomAlphaOfLengthBetween(1, 10)) + ); UpdateTransformActionPre78.Response oldResponse = writeAndReadBWCObject( newResponse, getNamedWriteableRegistry(), diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigTests.java index bb05c2f765dd0..697c723f6b8f4 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigTests.java @@ -39,11 +39,11 @@ public static TransformConfig randomTransformConfigWithoutHeaders() { return randomTransformConfigWithoutHeaders(randomAlphaOfLengthBetween(1, 10)); } - public static TransformConfig randomTransformConfig() { - return randomTransformConfig(randomAlphaOfLengthBetween(1, 10)); + public static TransformConfig randomTransformConfigWithoutHeaders(String id) { + return randomTransformConfigWithoutHeaders(Version.CURRENT, id); } - public static TransformConfig randomTransformConfigWithoutHeaders(String id) { + public static TransformConfig randomTransformConfigWithoutHeaders(Version version, String id) { return new TransformConfig( id, randomSourceConfig(), @@ -51,7 +51,7 @@ public static TransformConfig randomTransformConfigWithoutHeaders(String id) { randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)), randomBoolean() ? null : randomSyncConfig(), null, - PivotConfigTests.randomPivotConfig(), + PivotConfigTests.randomPivotConfig(version), randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000), SettingsConfigTests.randomSettingsConfig(), null, @@ -59,7 +59,15 @@ public static TransformConfig randomTransformConfigWithoutHeaders(String id) { ); } + public static TransformConfig randomTransformConfig() { + return randomTransformConfig(randomAlphaOfLengthBetween(1, 10)); + } + public static TransformConfig randomTransformConfig(String id) { + return randomTransformConfig(Version.CURRENT, id); + } + + public static TransformConfig randomTransformConfig(Version version, String id) { return new TransformConfig( id, randomSourceConfig(), @@ -67,7 +75,7 @@ public static TransformConfig randomTransformConfig(String id) { randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)), randomBoolean() ? null : randomSyncConfig(), randomHeaders(), - PivotConfigTests.randomPivotConfig(), + PivotConfigTests.randomPivotConfig(version), randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000), randomBoolean() ? null : SettingsConfigTests.randomSettingsConfig(), randomBoolean() ? null : Instant.now(), diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSourceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSourceTests.java index 265f2a9446828..4a538af8fc0c6 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSourceTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSourceTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.test.AbstractSerializingTestCase; +import org.elasticsearch.test.VersionUtils; import java.io.IOException; import java.time.ZoneOffset; @@ -25,13 +26,22 @@ public class DateHistogramGroupSourceTests extends AbstractSerializingTestCase { public static DateHistogramGroupSource randomDateHistogramGroupSource() { + return randomDateHistogramGroupSource(Version.CURRENT); + } + + public static DateHistogramGroupSource randomDateHistogramGroupSource(Version version) { String field = randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20); - ScriptConfig scriptConfig = randomBoolean() ? null : ScriptConfigTests.randomScriptConfig(); + ScriptConfig scriptConfig = version.onOrAfter(Version.V_7_7_0) + ? randomBoolean() ? null : ScriptConfigTests.randomScriptConfig() + : null; + boolean missingBucket = version.onOrAfter(Version.V_8_0_0) ? randomBoolean() : false; // todo: V_7_10_0 + DateHistogramGroupSource dateHistogramGroupSource; if (randomBoolean()) { dateHistogramGroupSource = new DateHistogramGroupSource( field, scriptConfig, + missingBucket, new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval(randomTimeValue(1, 100, "d", "h", "ms", "s", "m"))), randomBoolean() ? randomZone() : null ); @@ -39,6 +49,7 @@ public static DateHistogramGroupSource randomDateHistogramGroupSource() { dateHistogramGroupSource = new DateHistogramGroupSource( field, scriptConfig, + missingBucket, new DateHistogramGroupSource.CalendarInterval( new DateHistogramInterval(randomTimeValue(1, 1, "m", "h", "d", "w", "M", "q", "y")) ), @@ -49,8 +60,12 @@ public static DateHistogramGroupSource randomDateHistogramGroupSource() { return dateHistogramGroupSource; } - public void testBackwardsSerialization() throws IOException { - DateHistogramGroupSource groupSource = randomDateHistogramGroupSource(); + public void testBackwardsSerialization72() throws IOException { + // version 7.7 introduced scripts, so test before that + DateHistogramGroupSource groupSource = randomDateHistogramGroupSource( + VersionUtils.randomVersionBetween(random(), Version.V_7_3_0, Version.V_7_7_0) + ); + try (BytesStreamOutput output = new BytesStreamOutput()) { output.setVersion(Version.V_7_2_0); groupSource.writeTo(output); @@ -82,6 +97,7 @@ public void testRoundingDateHistogramFixedInterval() { DateHistogramGroupSource dateHistogramGroupSource = new DateHistogramGroupSource( field, null, + randomBoolean(), new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval("1d")), null ); @@ -104,6 +120,7 @@ public void testRoundingDateHistogramCalendarInterval() { DateHistogramGroupSource dateHistogramGroupSource = new DateHistogramGroupSource( field, null, + randomBoolean(), new DateHistogramGroupSource.CalendarInterval(new DateHistogramInterval("1w")), null ); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/GeoTileGroupSourceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/GeoTileGroupSourceTests.java index ff7a5838cd0f0..28b71d8bfdb35 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/GeoTileGroupSourceTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/GeoTileGroupSourceTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.core.transform.transforms.pivot; +import org.elasticsearch.Version; import org.elasticsearch.common.geo.GeoBoundingBox; import org.elasticsearch.common.geo.GeoPoint; import org.elasticsearch.common.io.stream.Writeable.Reader; @@ -20,14 +21,22 @@ public class GeoTileGroupSourceTests extends AbstractSerializingTestCase { public static GeoTileGroupSource randomGeoTileGroupSource() { + return randomGeoTileGroupSource(Version.CURRENT); + } + + public static GeoTileGroupSource randomGeoTileGroupSource(Version version) { Rectangle rectangle = GeometryTestUtils.randomRectangle(); + boolean missingBucket = version.onOrAfter(Version.V_8_0_0) ? randomBoolean() : false; // todo: V_7_10_0 return new GeoTileGroupSource( randomBoolean() ? null : randomAlphaOfLength(10), + missingBucket, randomBoolean() ? null : randomIntBetween(1, GeoTileUtils.MAX_ZOOM), - randomBoolean() ? null : new GeoBoundingBox( - new GeoPoint(rectangle.getMaxLat(), rectangle.getMinLon()), - new GeoPoint(rectangle.getMinLat(), rectangle.getMaxLon()) - ) + randomBoolean() + ? null + : new GeoBoundingBox( + new GeoPoint(rectangle.getMaxLat(), rectangle.getMinLon()), + new GeoPoint(rectangle.getMinLat(), rectangle.getMaxLon()) + ) ); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/GroupConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/GroupConfigTests.java index 5cf4261a0ba2b..be3e63e117436 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/GroupConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/GroupConfigTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.core.transform.transforms.pivot; +import org.elasticsearch.Version; import org.elasticsearch.common.ParsingException; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.Writeable.Reader; @@ -32,6 +33,10 @@ public class GroupConfigTests extends AbstractSerializingTestCase { private static final char[] ILLEGAL_FIELD_NAME_CHARACTERS = { '[', ']', '>' }; public static GroupConfig randomGroupConfig() { + return randomGroupConfig(Version.CURRENT); + } + + public static GroupConfig randomGroupConfig(Version version) { Map source = new LinkedHashMap<>(); Map groups = new LinkedHashMap<>(); @@ -44,16 +49,16 @@ public static GroupConfig randomGroupConfig() { Type type = randomFrom(SingleGroupSource.Type.values()); switch (type) { case TERMS: - groupBy = TermsGroupSourceTests.randomTermsGroupSource(); + groupBy = TermsGroupSourceTests.randomTermsGroupSource(version); break; case HISTOGRAM: - groupBy = HistogramGroupSourceTests.randomHistogramGroupSource(); + groupBy = HistogramGroupSourceTests.randomHistogramGroupSource(version); break; case DATE_HISTOGRAM: - groupBy = DateHistogramGroupSourceTests.randomDateHistogramGroupSource(); + groupBy = DateHistogramGroupSourceTests.randomDateHistogramGroupSource(version); break; case GEOTILE_GRID: - groupBy = GeoTileGroupSourceTests.randomGeoTileGroupSource(); + groupBy = GeoTileGroupSourceTests.randomGeoTileGroupSource(version); break; default: fail("unknown group source type, please implement tests and add support here"); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/HistogramGroupSourceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/HistogramGroupSourceTests.java index 08b58807e681f..e58fe2dbc02d8 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/HistogramGroupSourceTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/HistogramGroupSourceTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.core.transform.transforms.pivot; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.AbstractSerializingTestCase; @@ -15,11 +16,17 @@ public class HistogramGroupSourceTests extends AbstractSerializingTestCase { public static HistogramGroupSource randomHistogramGroupSource() { - String field = randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20); - ScriptConfig scriptConfig = randomBoolean() ? null : ScriptConfigTests.randomScriptConfig(); + return randomHistogramGroupSource(Version.CURRENT); + } + public static HistogramGroupSource randomHistogramGroupSource(Version version) { + String field = randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20); + ScriptConfig scriptConfig = version.onOrAfter(Version.V_7_7_0) + ? randomBoolean() ? null : ScriptConfigTests.randomScriptConfig() + : null; + boolean missingBucket = version.onOrAfter(Version.V_8_0_0) ? randomBoolean() : false; // todo: V_7_10_0 double interval = randomDoubleBetween(Math.nextUp(0), Double.MAX_VALUE, false); - return new HistogramGroupSource(field, scriptConfig, interval); + return new HistogramGroupSource(field, scriptConfig, missingBucket, interval); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/PivotConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/PivotConfigTests.java index d1328d45c8543..fa69ba55638df 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/PivotConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/PivotConfigTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.core.transform.transforms.pivot; +import org.elasticsearch.Version; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.common.xcontent.DeprecationHandler; @@ -24,16 +25,24 @@ public class PivotConfigTests extends AbstractSerializingTransformTestCase { public static PivotConfig randomPivotConfigWithDeprecatedFields() { + return randomPivotConfigWithDeprecatedFields(Version.CURRENT); + } + + public static PivotConfig randomPivotConfigWithDeprecatedFields(Version version) { return new PivotConfig( - GroupConfigTests.randomGroupConfig(), + GroupConfigTests.randomGroupConfig(version), AggregationConfigTests.randomAggregationConfig(), randomIntBetween(10, 10_000) // deprecated ); } public static PivotConfig randomPivotConfig() { + return randomPivotConfig(Version.CURRENT); + } + + public static PivotConfig randomPivotConfig(Version version) { return new PivotConfig( - GroupConfigTests.randomGroupConfig(), + GroupConfigTests.randomGroupConfig(version), AggregationConfigTests.randomAggregationConfig(), null // deprecated ); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/TermsGroupSourceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/TermsGroupSourceTests.java index 6966abc83adc1..ffcc269448ce1 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/TermsGroupSourceTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/TermsGroupSourceTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.core.transform.transforms.pivot; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.AbstractSerializingTestCase; @@ -15,10 +16,16 @@ public class TermsGroupSourceTests extends AbstractSerializingTestCase { public static TermsGroupSource randomTermsGroupSource() { - String field = randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20); - ScriptConfig scriptConfig = randomBoolean() ? null : ScriptConfigTests.randomScriptConfig(); + return randomTermsGroupSource(Version.CURRENT); + } - return new TermsGroupSource(field, scriptConfig); + public static TermsGroupSource randomTermsGroupSource(Version version) { + String field = randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20); + ScriptConfig scriptConfig = version.onOrAfter(Version.V_7_7_0) + ? randomBoolean() ? null : ScriptConfigTests.randomScriptConfig() + : null; + boolean missingBucket = version.onOrAfter(Version.V_8_0_0) ? randomBoolean() : false; // todo: V_7_10_0 + return new TermsGroupSource(field, scriptConfig, missingBucket); } @Override diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java index c8982620d62ee..6d5f0489c118f 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java @@ -29,10 +29,8 @@ import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; @@ -97,16 +95,18 @@ public void testSimplePivot() throws Exception { public void testSimpleDataStreamPivot() throws Exception { String indexName = "reviews_data_stream"; - createReviewsIndex(indexName, 1000, "date", true); + createReviewsIndex(indexName, 1000, "date", true, -1, null); String transformId = "simple_data_stream_pivot"; String transformIndex = "pivot_reviews_data_stream"; setupDataAccessRole(DATA_ACCESS_ROLE, indexName, transformIndex); - createPivotReviewsTransform(transformId, + createPivotReviewsTransform( + transformId, transformIndex, null, null, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS, - indexName); + indexName + ); startAndWaitForTransform(transformId, transformIndex, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS); @@ -338,7 +338,7 @@ public void testBucketSelectorPivot() throws Exception { public void testContinuousPivot() throws Exception { String indexName = "continuous_reviews"; - createReviewsIndex(indexName); + createReviewsIndex(indexName, 1000, "date", false, 5, "user_id"); String transformId = "simple_continuous_pivot"; String transformIndex = "pivot_reviews_continuous"; setupDataAccessRole(DATA_ACCESS_ROLE, indexName, transformIndex); @@ -360,7 +360,8 @@ public void testContinuousPivot() throws Exception { + " \"group_by\": {" + " \"reviewer\": {" + " \"terms\": {" - + " \"field\": \"user_id\"" + + " \"field\": \"user_id\"," + + " \"missing_bucket\": true" + " } } }," + " \"aggregations\": {" + " \"avg_rating\": {" @@ -376,7 +377,10 @@ public void testContinuousPivot() throws Exception { assertTrue(indexExists(transformIndex)); // get and check some users assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_0", 3.776978417); - assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_5", 3.72); + + // missing bucket check + assertOnePivotValue(transformIndex + "/_search?q=!_exists_:reviewer", 3.72); + assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_11", 3.846153846); assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_20", 3.769230769); assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_26", 3.918918918); @@ -424,6 +428,19 @@ public void testContinuousPivot() throws Exception { .append("\",\"timestamp\":") .append(dateStamp) .append("}\n"); + + bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n"); + bulk.append("{") + .append("\"business_id\":\"") + .append("business_") + .append(business) + .append("\",\"stars\":") + .append(stars) + .append(",\"location\":\"") + .append(location) + .append("\",\"timestamp\":") + .append(dateStamp) + .append("}\n"); } bulk.append("\r\n"); @@ -439,20 +456,12 @@ public void testContinuousPivot() throws Exception { // assert that other users are unchanged assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_0", 3.776978417); - assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_5", 3.72); + assertOnePivotValue(transformIndex + "/_search?q=!_exists_:reviewer", 4.36); assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_11", 3.846153846); assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_20", 3.769230769); - Map user26searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_26"); - assertEquals(1, XContentMapValues.extractValue("hits.total.value", user26searchResult)); - double actual = (Double) ((List) XContentMapValues.extractValue("hits.hits._source.avg_rating", user26searchResult)).get(0); - assertThat(actual, greaterThan(3.92)); - - Map user42searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_42"); - assertEquals(1, XContentMapValues.extractValue("hits.total.value", user42searchResult)); - actual = (Double) ((List) XContentMapValues.extractValue("hits.hits._source.avg_rating", user42searchResult)).get(0); - assertThat(actual, greaterThan(0.0)); - assertThat(actual, lessThan(5.0)); + assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_26", 4.354838709); + assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_42", 2.0); } public void testHistogramPivot() throws Exception { @@ -738,7 +747,7 @@ public void testPivotWithTermsAgg() throws Exception { + " \"size\": 2" + " }}" + " } " - +" }," + + " }," + " \"rare_users\": {" + " \"rare_terms\": {" + " \"field\": \"user_id\"" @@ -769,25 +778,28 @@ public void testPivotWithTermsAgg() throws Exception { searchResult )).get(0); assertThat(commonUsers, is(not(nullValue()))); - assertThat(commonUsers, equalTo(new HashMap<>(){{ - put("user_10", - Collections.singletonMap( - "common_businesses", - new HashMap<>(){{ + assertThat(commonUsers, equalTo(new HashMap<>() { + { + put("user_10", Collections.singletonMap("common_businesses", new HashMap<>() { + { put("business_12", 6); put("business_9", 4); - }})); - put("user_0", Collections.singletonMap( - "common_businesses", - new HashMap<>(){{ - put("business_0", 35); - }})); - }})); + } + })); + put("user_0", Collections.singletonMap("common_businesses", new HashMap<>() { + { + put("business_0", 35); + } + })); + } + })); assertThat(rareUsers, is(not(nullValue()))); - assertThat(rareUsers, equalTo(new HashMap<>(){{ - put("user_5", 1); - put("user_12", 1); - }})); + assertThat(rareUsers, equalTo(new HashMap<>() { + { + put("user_5", 1); + put("user_12", 1); + } + })); } private void assertDateHistogramPivot(String indexName) throws Exception { diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformProgressIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformProgressIT.java index 559bc510ceaca..3b9c5265d05ad 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformProgressIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformProgressIT.java @@ -52,7 +52,7 @@ import static org.hamcrest.Matchers.is; public class TransformProgressIT extends ESRestTestCase { - protected void createReviewsIndex() throws Exception { + protected void createReviewsIndex(int userWithMissingBuckets) throws Exception { final int numDocs = 1000; final RestHighLevelClient restClient = new TestRestHighLevelClient(); @@ -98,12 +98,14 @@ protected void createReviewsIndex() throws Exception { String date_string = "2017-01-" + day + "T" + hour + ":" + min + ":" + sec + "Z"; StringBuilder sourceBuilder = new StringBuilder(); - sourceBuilder.append("{\"user_id\":\"") - .append("user_") - .append(user) - .append("\",\"count\":") - .append(i) - .append(",\"business_id\":\"") + sourceBuilder.append("{"); + sourceBuilder.append("\"user_id\":\"").append("user_").append(user).append("\","); + + if (user != userWithMissingBuckets) { + sourceBuilder.append("\"count\":").append(i).append(","); + } + + sourceBuilder.append("\"business_id\":\"") .append("business_") .append(business) .append("\",\"stars\":") @@ -120,18 +122,28 @@ protected void createReviewsIndex() throws Exception { day += 1; } } - restClient.bulk(bulk, RequestOptions.DEFAULT); + BulkResponse bulkResponse = restClient.bulk(bulk, RequestOptions.DEFAULT); + assertFalse(bulkResponse.hasFailures()); restClient.indices().refresh(new RefreshRequest(REVIEWS_INDEX_NAME), RequestOptions.DEFAULT); } public void testGetProgress() throws Exception { + assertGetProgress(-1); + } + + public void testGetProgressMissingBucket() throws Exception { + assertGetProgress(randomIntBetween(1, 25)); + } + + public void assertGetProgress(int userWithMissingBuckets) throws Exception { String transformId = "get_progress_transform"; - createReviewsIndex(); + boolean missingBucket = userWithMissingBuckets > 0; + createReviewsIndex(userWithMissingBuckets); SourceConfig sourceConfig = new SourceConfig(REVIEWS_INDEX_NAME); DestConfig destConfig = new DestConfig("unnecessary", null); GroupConfig histgramGroupConfig = new GroupConfig( Collections.emptyMap(), - Collections.singletonMap("every_50", new HistogramGroupSource("count", null, 50.0)) + Collections.singletonMap("every_50", new HistogramGroupSource("count", null, missingBucket, 50.0)) ); AggregatorFactories.Builder aggs = new AggregatorFactories.Builder(); aggs.addAggregator(AggregationBuilders.avg("avg_rating").field("stars")); @@ -147,6 +159,12 @@ public void testGetProgress() throws Exception { assertThat(progress.getDocumentsProcessed(), equalTo(0L)); assertThat(progress.getPercentComplete(), equalTo(0.0)); + progress = getProgress(pivot, getProgressQuery(pivot, config.getSource().getIndex(), QueryBuilders.rangeQuery("stars").gte(2))); + + assertThat(progress.getTotalDocs(), equalTo(600L)); + assertThat(progress.getDocumentsProcessed(), equalTo(0L)); + assertThat(progress.getPercentComplete(), equalTo(0.0)); + progress = getProgress( pivot, getProgressQuery(pivot, config.getSource().getIndex(), QueryBuilders.termQuery("user_id", "user_26")) @@ -158,7 +176,7 @@ public void testGetProgress() throws Exception { histgramGroupConfig = new GroupConfig( Collections.emptyMap(), - Collections.singletonMap("every_50", new HistogramGroupSource("missing_field", null, 50.0)) + Collections.singletonMap("every_50", new HistogramGroupSource("missing_field", null, missingBucket, 50.0)) ); pivotConfig = new PivotConfig(histgramGroupConfig, aggregationConfig, null); pivot = new Pivot(pivotConfig, transformId); @@ -168,9 +186,14 @@ public void testGetProgress() throws Exception { getProgressQuery(pivot, config.getSource().getIndex(), QueryBuilders.termQuery("user_id", "user_26")) ); - assertThat(progress.getTotalDocs(), equalTo(0L)); assertThat(progress.getDocumentsProcessed(), equalTo(0L)); - assertThat(progress.getPercentComplete(), equalTo(100.0)); + if (missingBucket) { + assertThat(progress.getTotalDocs(), equalTo(35L)); + assertThat(progress.getPercentComplete(), equalTo(0.0)); + } else { + assertThat(progress.getTotalDocs(), equalTo(0L)); + assertThat(progress.getPercentComplete(), equalTo(100.0)); + } deleteIndex(REVIEWS_INDEX_NAME); } @@ -192,10 +215,7 @@ private TransformProgress getProgress(Function function, SearchRequest searchReq function.getInitialProgressFromResponse( response, - new LatchedActionListener<>( - ActionListener.wrap(progressHolder::set, e -> { exceptionHolder.set(e); }), - latch - ) + new LatchedActionListener<>(ActionListener.wrap(progressHolder::set, e -> { exceptionHolder.set(e); }), latch) ); } diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java index f7b7107e4a3cd..6e94e3225a6e1 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java @@ -80,61 +80,17 @@ protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOE return super.buildClient(settings, hosts); } - protected void createReviewsIndex(String indexName, int numDocs, String dateType, boolean isDataStream) throws IOException { - int[] distributionTable = { 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 4, 4, 4, 3, 3, 2, 1, 1, 1 }; - - // create mapping - try (XContentBuilder builder = jsonBuilder()) { - builder.startObject(); - { - - builder.startObject("mappings").startObject("properties"); - builder.startObject("@timestamp").field("type", dateType); - if (dateType.equals("date_nanos")) { - builder.field("format", "strict_date_optional_time_nanos"); - } - builder.endObject(); - builder.startObject("timestamp").field("type", dateType); - if (dateType.equals("date_nanos")) { - builder.field("format", "strict_date_optional_time_nanos"); - } - builder.endObject() - .startObject("user_id") - .field("type", "keyword") - .endObject() - .startObject("business_id") - .field("type", "keyword") - .endObject() - .startObject("stars") - .field("type", "integer") - .endObject() - .startObject("location") - .field("type", "geo_point") - .endObject() - .endObject() - .endObject(); - } - builder.endObject(); - if (isDataStream) { - Request createCompositeTemplate = new Request("PUT", "_index_template/" + indexName + "_template"); - createCompositeTemplate.setJsonEntity( - "{\n" + - " \"index_patterns\": [ \"" + indexName + "\" ],\n" + - " \"data_stream\": {\n" + - " },\n" + - " \"template\": \n" + Strings.toString(builder) + - "}" - ); - client().performRequest(createCompositeTemplate); - client().performRequest(new Request("PUT", "_data_stream/" + indexName)); - } else { - final StringEntity entity = new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON); - Request req = new Request("PUT", indexName); - req.setEntity(entity); - client().performRequest(req); - } - } + protected void createReviewsIndex( + String indexName, + int numDocs, + String dateType, + boolean isDataStream, + int userWithMissingBuckets, + String missingBucketField + ) throws IOException { + putReviewsIndex(indexName, dateType, isDataStream); + int[] distributionTable = { 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 4, 4, 4, 3, 3, 2, 1, 1, 1 }; // create index final StringBuilder bulk = new StringBuilder(); int day = 10; @@ -161,21 +117,26 @@ protected void createReviewsIndex(String indexName, int numDocs, String dateType } date_string += "Z"; - bulk.append("{\"user_id\":\"") - .append("user_") - .append(user) - .append("\",\"business_id\":\"") - .append("business_") - .append(business) - .append("\",\"stars\":") - .append(stars) - .append(",\"location\":\"") - .append(location) - .append("\",\"timestamp\":\"") - .append(date_string) - .append("\",\"@timestamp\":\"") - .append(date_string) - .append("\"}\n"); + bulk.append("{"); + if ((user == userWithMissingBuckets && missingBucketField.equals("user_id")) == false) { + bulk.append("\"user_id\":\"").append("user_").append(user).append("\","); + } + if ((user == userWithMissingBuckets && missingBucketField.equals("business_id")) == false) { + bulk.append("\"business_id\":\"").append("business_").append(business).append("\","); + } + if ((user == userWithMissingBuckets && missingBucketField.equals("stars")) == false) { + bulk.append("\"stars\":").append(stars).append(","); + } + if ((user == userWithMissingBuckets && missingBucketField.equals("location")) == false) { + bulk.append("\"location\":\"").append(location).append("\","); + } + if ((user == userWithMissingBuckets && missingBucketField.equals("timestamp")) == false) { + bulk.append("\"timestamp\":\"").append(date_string).append("\","); + } + + // always add @timestamp to avoid complicated logic regarding ',' + bulk.append("\"@timestamp\":\"").append(date_string).append("\""); + bulk.append("}\n"); if (i % 50 == 0) { bulk.append("\r\n"); @@ -196,6 +157,62 @@ protected void createReviewsIndex(String indexName, int numDocs, String dateType client().performRequest(bulkRequest); } + protected void putReviewsIndex(String indexName, String dateType, boolean isDataStream) throws IOException { + // create mapping + try (XContentBuilder builder = jsonBuilder()) { + builder.startObject(); + { + builder.startObject("mappings").startObject("properties"); + builder.startObject("@timestamp").field("type", dateType); + if (dateType.equals("date_nanos")) { + builder.field("format", "strict_date_optional_time_nanos"); + } + builder.endObject(); + builder.startObject("timestamp").field("type", dateType); + if (dateType.equals("date_nanos")) { + builder.field("format", "strict_date_optional_time_nanos"); + } + builder.endObject() + .startObject("user_id") + .field("type", "keyword") + .endObject() + .startObject("business_id") + .field("type", "keyword") + .endObject() + .startObject("stars") + .field("type", "integer") + .endObject() + .startObject("location") + .field("type", "geo_point") + .endObject() + .endObject() + .endObject(); + } + builder.endObject(); + if (isDataStream) { + Request createCompositeTemplate = new Request("PUT", "_index_template/" + indexName + "_template"); + createCompositeTemplate.setJsonEntity( + "{\n" + + " \"index_patterns\": [ \"" + + indexName + + "\" ],\n" + + " \"data_stream\": {\n" + + " },\n" + + " \"template\": \n" + + Strings.toString(builder) + + "}" + ); + client().performRequest(createCompositeTemplate); + client().performRequest(new Request("PUT", "_data_stream/" + indexName)); + } else { + final StringEntity entity = new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON); + Request req = new Request("PUT", indexName); + req.setEntity(entity); + client().performRequest(req); + } + } + } + /** * Create a simple dataset for testing with reviewers, ratings and businesses */ @@ -204,7 +221,7 @@ protected void createReviewsIndex() throws IOException { } protected void createReviewsIndex(String indexName) throws IOException { - createReviewsIndex(indexName, 1000, "date", false); + createReviewsIndex(indexName, 1000, "date", false, -1, null); } protected void createPivotReviewsTransform(String transformId, String transformIndex, String query) throws IOException { @@ -217,7 +234,7 @@ protected void createPivotReviewsTransform(String transformId, String transformI } protected void createReviewsIndexNano() throws IOException { - createReviewsIndex(REVIEWS_DATE_NANO_INDEX_NAME, 1000, "date_nanos", false); + createReviewsIndex(REVIEWS_DATE_NANO_INDEX_NAME, 1000, "date_nanos", false, -1, null); } protected void createContinuousPivotReviewsTransform(String transformId, String transformIndex, String authHeader) throws IOException { @@ -247,12 +264,14 @@ protected void createContinuousPivotReviewsTransform(String transformId, String assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); } - protected void createPivotReviewsTransform(String transformId, - String transformIndex, - String query, - String pipeline, - String authHeader, - String sourceIndex) throws IOException { + protected void createPivotReviewsTransform( + String transformId, + String transformIndex, + String query, + String pipeline, + String authHeader, + String sourceIndex + ) throws IOException { final Request createTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId, authHeader); String config = "{"; diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformTaskFailedStateIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformTaskFailedStateIT.java index 2bfc96bf2b92a..5502feaa5394a 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformTaskFailedStateIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformTaskFailedStateIT.java @@ -64,7 +64,7 @@ public void cleanUpPotentiallyFailedTransform() throws Exception { public void testForceStopFailedTransform() throws Exception { String transformId = "test-force-stop-failed-transform"; - createReviewsIndex(REVIEWS_INDEX_NAME, 10, "date", false); + createReviewsIndex(REVIEWS_INDEX_NAME, 10, "date", false, -1, null); String transformIndex = "failure_pivot_reviews"; createDestinationIndexWithBadMapping(transformIndex); createContinuousPivotReviewsTransform(transformId, transformIndex, null); @@ -102,7 +102,7 @@ public void testForceStopFailedTransform() throws Exception { public void testStartFailedTransform() throws Exception { String transformId = "test-force-start-failed-transform"; - createReviewsIndex(REVIEWS_INDEX_NAME, 10, "date", false); + createReviewsIndex(REVIEWS_INDEX_NAME, 10, "date", false, -1, null); String transformIndex = "failure_pivot_reviews"; createDestinationIndexWithBadMapping(transformIndex); createContinuousPivotReviewsTransform(transformId, transformIndex, null); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/Function.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/Function.java index 97d349ed251c7..26c8275ee68ed 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/Function.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/Function.java @@ -92,6 +92,13 @@ public interface ChangeCollector { * @return the position, null in case the collector is exhausted */ Map getBucketPosition(); + + /** + * Whether the collector optimizes change detection by narrowing the required query. + * + * @return true if the collector optimizes change detection + */ + boolean isOptimized(); } /** diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 64b083328ba4e..76223489a5e7c 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -347,6 +347,21 @@ protected void initializeFunction() { if (isContinuous()) { changeCollector = function.buildChangeCollector(getConfig().getSyncConfig().getField()); + + if (changeCollector.isOptimized() == false) { + logger.warn( + new ParameterizedMessage( + "[{}] could not find any optimizations for continuous execution, " + + "this transform might run slowly, please check your configuration.", + getJobId() + ) + ); + auditor.warning( + getJobId(), + "could not find any optimizations for continuous execution, " + + "this transform might run slowly, please check your configuration." + ); + } } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/CompositeBucketsChangeCollector.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/CompositeBucketsChangeCollector.java index 7c8c77176467b..09cb528e0f4a5 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/CompositeBucketsChangeCollector.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/CompositeBucketsChangeCollector.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.geo.GeoPoint; import org.elasticsearch.geometry.Rectangle; import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.ExistsQueryBuilder; import org.elasticsearch.index.query.GeoBoundingBoxQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -90,18 +91,30 @@ interface FieldCollector { * Clear the field collector, e.g. the changes to free up memory. */ void clear(); + + /** + * Whether the collector optimizes change detection by narrowing the required query. + * + * @return true if the collector optimizes change detection + */ + boolean isOptimized(); } static class TermsFieldCollector implements FieldCollector { private final String sourceFieldName; private final String targetFieldName; + private final boolean missingBucket; private final Set changedTerms; + // although we could add null to the hash set, its easier to handle null separately + private boolean foundNullBucket; - TermsFieldCollector(final String sourceFieldName, final String targetFieldName) { + TermsFieldCollector(final String sourceFieldName, final String targetFieldName, final boolean missingBucket) { this.sourceFieldName = sourceFieldName; this.targetFieldName = targetFieldName; + this.missingBucket = missingBucket; this.changedTerms = new HashSet<>(); + this.foundNullBucket = false; } @Override @@ -114,11 +127,16 @@ public int getMaxPageSize() { @Override public boolean collectChanges(Collection buckets) { changedTerms.clear(); + foundNullBucket = false; for (Bucket b : buckets) { Object term = b.getKey().get(targetFieldName); if (term != null) { changedTerms.add(term.toString()); + } else { + // we should not find a null bucket if missing bucket is false + assert missingBucket; + foundNullBucket = true; } } @@ -127,7 +145,44 @@ public boolean collectChanges(Collection buckets) { @Override public QueryBuilder filterByChanges(long lastCheckpointTimestamp, long nextcheckpointTimestamp) { - if (changedTerms.isEmpty() == false) { + if (missingBucket && foundNullBucket) { + QueryBuilder missingBucketQuery = new BoolQueryBuilder().mustNot(new ExistsQueryBuilder(sourceFieldName)); + + if (changedTerms.isEmpty()) { + return missingBucketQuery; + } + + /** + * Combined query with terms and missing bucket: + * + * "bool": { + * "should": [ + * { + * "terms": { + * "source_field": [ + * "term1", + * "term2", + * ... + * ] + * } + * }, + * { + * "bool": { + * "must_not": [ + * { + * "exists": { + * "field": "source_field" + * } + * } + * ] + * } + * } + * ] + * } + */ + return new BoolQueryBuilder().should(new TermsQueryBuilder(sourceFieldName, changedTerms)).should(missingBucketQuery); + + } else if (changedTerms.isEmpty() == false) { return new TermsQueryBuilder(sourceFieldName, changedTerms); } return null; @@ -136,31 +191,43 @@ public QueryBuilder filterByChanges(long lastCheckpointTimestamp, long nextcheck @Override public void clear() { changedTerms.clear(); + foundNullBucket = false; } @Override public AggregationBuilder aggregateChanges() { return null; } + + @Override + public boolean isOptimized() { + return true; + } } static class DateHistogramFieldCollector implements FieldCollector { private final String sourceFieldName; private final String targetFieldName; - private final boolean isSynchronizationField; + private final boolean missingBucket; + private final boolean applyOptimizationForSyncField; private final Rounding.Prepared rounding; DateHistogramFieldCollector( final String sourceFieldName, final String targetFieldName, + final boolean missingBucket, final Rounding.Prepared rounding, final boolean isSynchronizationField ) { this.sourceFieldName = sourceFieldName; this.targetFieldName = targetFieldName; + this.missingBucket = missingBucket; this.rounding = rounding; - this.isSynchronizationField = isSynchronizationField; + + // if missing_bucket is set to true, we can't apply the optimization, note: this combination + // is illogical, because the sync field should be steady + this.applyOptimizationForSyncField = isSynchronizationField && (missingBucket == false); } @Override @@ -176,7 +243,9 @@ public boolean collectChanges(Collection buckets) { @Override public QueryBuilder filterByChanges(long lastCheckpointTimestamp, long nextcheckpointTimestamp) { - if (isSynchronizationField && lastCheckpointTimestamp > 0) { + + if (applyOptimizationForSyncField && lastCheckpointTimestamp > 0) { + assert missingBucket == false; return new RangeQueryBuilder(sourceFieldName).gte(rounding.round(lastCheckpointTimestamp)).format("epoch_millis"); } @@ -192,16 +261,24 @@ public void clear() {} public AggregationBuilder aggregateChanges() { return null; } + + @Override + public boolean isOptimized() { + // we only have 1 optimization + return applyOptimizationForSyncField; + } } static class HistogramFieldCollector implements FieldCollector { private final String sourceFieldName; private final String targetFieldName; + private final boolean missingBucket; - HistogramFieldCollector(final String sourceFieldName, final String targetFieldName) { + HistogramFieldCollector(final String sourceFieldName, final String targetFieldName, final boolean missingBucket) { this.sourceFieldName = sourceFieldName; this.targetFieldName = targetFieldName; + this.missingBucket = missingBucket; } @Override @@ -226,18 +303,28 @@ public void clear() {} public AggregationBuilder aggregateChanges() { return null; } + + @Override + public boolean isOptimized() { + return false; + } } static class GeoTileFieldCollector implements FieldCollector { private final String sourceFieldName; private final String targetFieldName; + private final boolean missingBucket; private final Set changedBuckets; + // although we could add null to the hash set, its easier to handle null separately + private boolean foundNullBucket; - GeoTileFieldCollector(final String sourceFieldName, final String targetFieldName) { + GeoTileFieldCollector(final String sourceFieldName, final String targetFieldName, final boolean missingBucket) { this.sourceFieldName = sourceFieldName; this.targetFieldName = targetFieldName; + this.missingBucket = missingBucket; this.changedBuckets = new HashSet<>(); + this.foundNullBucket = false; } @Override @@ -249,11 +336,16 @@ public int getMaxPageSize() { @Override public boolean collectChanges(Collection buckets) { changedBuckets.clear(); + foundNullBucket = false; for (Bucket b : buckets) { Object bucket = b.getKey().get(targetFieldName); if (bucket != null) { changedBuckets.add(bucket.toString()); + } else { + // we should not find a null bucket if missing bucket is false + assert missingBucket; + foundNullBucket = true; } } @@ -262,16 +354,69 @@ public boolean collectChanges(Collection buckets) { @Override public QueryBuilder filterByChanges(long lastCheckpointTimestamp, long nextcheckpointTimestamp) { - if (changedBuckets != null && changedBuckets.isEmpty() == false) { - BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); - changedBuckets.stream().map(GeoTileUtils::toBoundingBox).map(this::toGeoQuery).forEach(boolQueryBuilder::should); - return boolQueryBuilder; + BoolQueryBuilder boundingBoxesQueryBuilder = null; + + if (changedBuckets.isEmpty() == false) { + boundingBoxesQueryBuilder = QueryBuilders.boolQuery(); + changedBuckets.stream().map(GeoTileUtils::toBoundingBox).map(this::toGeoQuery).forEach(boundingBoxesQueryBuilder::should); } - return null; + + if (missingBucket && foundNullBucket) { + QueryBuilder missingBucketQuery = new BoolQueryBuilder().mustNot(new ExistsQueryBuilder(sourceFieldName)); + + if (boundingBoxesQueryBuilder == null) { + return missingBucketQuery; + } + + /** + * Combined query with geo bounding boxes and missing bucket: + * + * "bool": { + * "should": [ + * { + * "geo_bounding_box": { + * "source_field": { + * "top_left": { + * "lat": x1, + * "lon": y1 + * }, + * "bottom_right": { + * "lat": x2, + * "lon": y2 + * } + * } + * } + * }, + * { + * "geo_bounding_box": { + * ... + * } + * }, + * { + * "bool": { + * "must_not": [ + * { + * "exists": { + * "field": "source_field" + * } + * } + * ] + * } + * } + * ] + * } + */ + return boundingBoxesQueryBuilder.should(missingBucketQuery); + } + + return boundingBoxesQueryBuilder; } @Override - public void clear() {} + public void clear() { + changedBuckets.clear(); + foundNullBucket = false; + } @Override public AggregationBuilder aggregateChanges() { @@ -285,9 +430,14 @@ private GeoBoundingBoxQueryBuilder toGeoQuery(Rectangle rectangle) { new GeoPoint(rectangle.getMinLat(), rectangle.getMaxLon()) ); } + + @Override + public boolean isOptimized() { + return true; + } } - public CompositeBucketsChangeCollector( + private CompositeBucketsChangeCollector( @Nullable CompositeAggregationBuilder compositeAggregation, Map fieldCollectors ) { @@ -368,6 +518,11 @@ public Map getBucketPosition() { return afterKey; } + @Override + public boolean isOptimized() { + return fieldCollectors.values().stream().anyMatch(FieldCollector::isOptimized); + } + public static ChangeCollector buildChangeCollector( @Nullable CompositeAggregationBuilder compositeAggregationBuilder, Map groups, @@ -385,13 +540,21 @@ static Map createFieldCollectors(Map createFieldCollectors(Map createFieldCollectors(Map, Map> processSearchResponse( public SearchSourceBuilder buildSearchQueryForInitialProgress(SearchSourceBuilder searchSourceBuilder) { BoolQueryBuilder existsClauses = QueryBuilders.boolQuery(); - config.getGroupConfig() - .getGroups() - .values() - // TODO change once we allow missing_buckets - .forEach(src -> { - if (src.getField() != null) { - existsClauses.must(QueryBuilders.existsQuery(src.getField())); - } - }); + config.getGroupConfig().getGroups().values().forEach(src -> { + if (src.getMissingBucket() == false && src.getField() != null) { + existsClauses.must(QueryBuilders.existsQuery(src.getField())); + } + }); return searchSourceBuilder.query(existsClauses).size(0).trackTotalHits(true); } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/CompositeBucketsChangeCollectorTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/CompositeBucketsChangeCollectorTests.java index 5a24a542634e0..2ee5a0553f127 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/CompositeBucketsChangeCollectorTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/CompositeBucketsChangeCollectorTests.java @@ -102,7 +102,7 @@ public void testTermsFieldCollector() throws IOException { Map groups = new LinkedHashMap<>(); // a terms group_by is limited by terms query - SingleGroupSource termsGroupBy = new TermsGroupSource("id", null); + SingleGroupSource termsGroupBy = new TermsGroupSource("id", null, false); groups.put("id", termsGroupBy); ChangeCollector collector = CompositeBucketsChangeCollector.buildChangeCollector(getCompositeAggregation(groups), groups, null);