From e9944c4dbb6cd8d1e78a32f018df85aacd7d095d Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Tue, 7 May 2019 16:28:35 -0500 Subject: [PATCH 1/4] [ML] adding pivot.size option for setting paging size --- .../transforms/pivot/PivotConfig.java | 38 ++++++++++++++++--- .../transforms/pivot/PivotConfigTests.java | 4 +- .../DataFrameTransformDocumentationIT.java | 5 ++- .../dataframe/put_data_frame.asciidoc | 5 +++ .../xpack/core/dataframe/DataFrameField.java | 1 + .../action/PutDataFrameTransformAction.java | 12 +++++- .../transforms/pivot/PivotConfig.java | 24 ++++++++++-- .../transforms/pivot/PivotConfigTests.java | 8 +++- .../integration/DataFrameIntegTestCase.java | 8 +++- .../dataframe/transforms/pivot/Pivot.java | 8 ++-- .../transforms/DataFrameIndexerTests.java | 29 +++++++++----- .../transforms/pivot/PivotTests.java | 14 ++++++- .../test/data_frame/transforms_crud.yml | 30 +++++++++++++++ 13 files changed, 155 insertions(+), 31 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/pivot/PivotConfig.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/pivot/PivotConfig.java index 0c3a6e3ea890b..75fbc9c81e2d8 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/pivot/PivotConfig.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/pivot/PivotConfig.java @@ -39,25 +39,29 @@ public class PivotConfig implements ToXContentObject { private static final ParseField GROUP_BY = new ParseField("group_by"); private static final ParseField AGGREGATIONS = new ParseField("aggregations"); + private static final ParseField SIZE = new ParseField("size"); private final GroupConfig groups; private final AggregationConfig aggregationConfig; + private final Integer size; private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("pivot_config", true, - args -> new PivotConfig((GroupConfig) args[0], (AggregationConfig) args[1])); + args -> new PivotConfig((GroupConfig) args[0], (AggregationConfig) args[1], (Integer) args[2])); static { PARSER.declareObject(constructorArg(), (p, c) -> (GroupConfig.fromXContent(p)), GROUP_BY); PARSER.declareObject(optionalConstructorArg(), (p, c) -> AggregationConfig.fromXContent(p), AGGREGATIONS); + PARSER.declareInt(optionalConstructorArg(), SIZE); } public static PivotConfig fromXContent(final XContentParser parser) { return PARSER.apply(parser, null); } - PivotConfig(GroupConfig groups, final AggregationConfig aggregationConfig) { + PivotConfig(GroupConfig groups, final AggregationConfig aggregationConfig, Integer size) { this.groups = groups; this.aggregationConfig = aggregationConfig; + this.size = size; } @Override @@ -65,6 +69,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject(); builder.field(GROUP_BY.getPreferredName(), groups); builder.field(AGGREGATIONS.getPreferredName(), aggregationConfig); + if (size != null) { + builder.field(SIZE.getPreferredName(), size); + } builder.endObject(); return builder; } @@ -77,6 +84,10 @@ public GroupConfig getGroupConfig() { return groups; } + public Integer getSize() { + return size; + } + @Override public boolean equals(Object other) { if (this == other) { @@ -89,12 +100,14 @@ public boolean equals(Object other) { final PivotConfig that = (PivotConfig) other; - return Objects.equals(this.groups, that.groups) && Objects.equals(this.aggregationConfig, that.aggregationConfig); + return Objects.equals(this.groups, that.groups) + && Objects.equals(this.aggregationConfig, that.aggregationConfig) + && Objects.equals(this.size, that.size); } @Override public int hashCode() { - return Objects.hash(groups, aggregationConfig); + return Objects.hash(groups, aggregationConfig, size); } public static Builder builder() { @@ -104,6 +117,7 @@ public static Builder builder() { public static class Builder { private GroupConfig groups; private AggregationConfig aggregationConfig; + private Integer size; /** * Set how to group the source data @@ -135,8 +149,22 @@ public Builder setAggregations(AggregatorFactories.Builder aggregations) { return this; } + /** + * Sets the paging maximum paging size that date frame transform can use when + * pulling the data from the source index. + * + * If OOM is triggered, the paging size is dynamically reduced so that the transform can continue to gather data. + * + * @param size Integer value between 10 and 10_000 + * @return the {@link Builder} with the paging size set. + */ + public Builder setSize(Integer size) { + this.size = size; + return this; + } + public PivotConfig build() { - return new PivotConfig(groups, aggregationConfig); + return new PivotConfig(groups, aggregationConfig, size); } } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/pivot/PivotConfigTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/pivot/PivotConfigTests.java index d2e036d9f1ad2..5cafcb9f419b5 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/pivot/PivotConfigTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/pivot/PivotConfigTests.java @@ -32,7 +32,9 @@ public class PivotConfigTests extends AbstractXContentTestCase { public static PivotConfig randomPivotConfig() { - return new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomAggregationConfig()); + return new PivotConfig(GroupConfigTests.randomGroupConfig(), + AggregationConfigTests.randomAggregationConfig(), + randomBoolean() ? null : randomIntBetween(10, 10_000)); } @Override diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java index 3c5059279b44d..f541689922819 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java @@ -137,8 +137,9 @@ public void testPutDataFrameTransform() throws IOException, InterruptedException // end::put-data-frame-transform-agg-config // tag::put-data-frame-transform-pivot-config PivotConfig pivotConfig = PivotConfig.builder() - .setGroups(groupConfig) - .setAggregationConfig(aggConfig) + .setGroups(groupConfig) // <1> + .setAggregationConfig(aggConfig) // <2> + .setSize(1000) // <3> .build(); // end::put-data-frame-transform-pivot-config // tag::put-data-frame-transform-config diff --git a/docs/java-rest/high-level/dataframe/put_data_frame.asciidoc b/docs/java-rest/high-level/dataframe/put_data_frame.asciidoc index bb1b20aaa1a52..567449c9c25b1 100644 --- a/docs/java-rest/high-level/dataframe/put_data_frame.asciidoc +++ b/docs/java-rest/high-level/dataframe/put_data_frame.asciidoc @@ -66,6 +66,11 @@ Defines the pivot function `group by` fields and the aggregation to reduce the d -------------------------------------------------- include-tagged::{doc-tests-file}[{api}-pivot-config] -------------------------------------------------- +<1> The `GroupConfig` to use in the pivot +<2> The aggregations to use +<3> The maximum paging size for the transform when pulling data +from the source. The size dynamically adjusts as the transform +is running to recover from and prevent OOM issues. ===== GroupConfig The grouping terms. Defines the group by and destination fields diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java index 71bf14cdeb4a5..40ee0c8a7292d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java @@ -27,6 +27,7 @@ public final class DataFrameField { public static final ParseField SOURCE = new ParseField("source"); public static final ParseField DESTINATION = new ParseField("dest"); public static final ParseField FORCE = new ParseField("force"); + public static final ParseField SIZE = new ParseField("size"); /** * Fields for checkpointing diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PutDataFrameTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PutDataFrameTransformAction.java index 0f6cc63f98851..6c6cfa7f1208d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PutDataFrameTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PutDataFrameTransformAction.java @@ -20,6 +20,8 @@ import java.io.IOException; import java.util.Objects; +import static org.elasticsearch.action.ValidateActions.addValidationError; + public class PutDataFrameTransformAction extends Action { public static final PutDataFrameTransformAction INSTANCE = new PutDataFrameTransformAction(); @@ -53,7 +55,15 @@ public static Request fromXContent(final XContentParser parser, final String id) @Override public ActionRequestValidationException validate() { - return null; + ActionRequestValidationException validationException = null; + if(config.getPivotConfig() != null + && config.getPivotConfig().getSize() != null + && (config.getPivotConfig().getSize() < 10 || config.getPivotConfig().getSize() > 10_000)) { + validationException = addValidationError( + "pivot.size [" + config.getPivotConfig().getSize() + "] must be greater than 10 and less than 10,000", + validationException); + } + return validationException; } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/pivot/PivotConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/pivot/PivotConfig.java index c1c894e2971ae..e016a54c8d0ee 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/pivot/PivotConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/pivot/PivotConfig.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.core.dataframe.transforms.pivot; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -29,6 +30,7 @@ public class PivotConfig implements Writeable, ToXContentObject { private static final String NAME = "data_frame_transform_pivot"; private final GroupConfig groups; private final AggregationConfig aggregationConfig; + private final Integer size; private static final ConstructingObjectParser STRICT_PARSER = createParser(false); private static final ConstructingObjectParser LENIENT_PARSER = createParser(true); @@ -55,7 +57,7 @@ private static ConstructingObjectParser createParser(boolean throw new IllegalArgumentException("Required [aggregations]"); } - return new PivotConfig(groups, aggregationConfig); + return new PivotConfig(groups, aggregationConfig, (Integer)args[3]); }); parser.declareObject(constructorArg(), @@ -63,18 +65,21 @@ private static ConstructingObjectParser createParser(boolean parser.declareObject(optionalConstructorArg(), (p, c) -> AggregationConfig.fromXContent(p, lenient), DataFrameField.AGGREGATIONS); parser.declareObject(optionalConstructorArg(), (p, c) -> AggregationConfig.fromXContent(p, lenient), DataFrameField.AGGS); + parser.declareInt(optionalConstructorArg(), DataFrameField.SIZE); return parser; } - public PivotConfig(final GroupConfig groups, final AggregationConfig aggregationConfig) { + public PivotConfig(final GroupConfig groups, final AggregationConfig aggregationConfig, Integer size) { this.groups = ExceptionsHelper.requireNonNull(groups, DataFrameField.GROUP_BY.getPreferredName()); this.aggregationConfig = ExceptionsHelper.requireNonNull(aggregationConfig, DataFrameField.AGGREGATIONS.getPreferredName()); + this.size = size; } public PivotConfig(StreamInput in) throws IOException { this.groups = new GroupConfig(in); this.aggregationConfig = new AggregationConfig(in); + this.size = in.readOptionalInt(); } @Override @@ -82,6 +87,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject(); builder.field(DataFrameField.GROUP_BY.getPreferredName(), groups); builder.field(DataFrameField.AGGREGATIONS.getPreferredName(), aggregationConfig); + if (size != null) { + builder.field(DataFrameField.SIZE.getPreferredName(), size); + } builder.endObject(); return builder; } @@ -107,6 +115,7 @@ public void toCompositeAggXContent(XContentBuilder builder, Params params) throw public void writeTo(StreamOutput out) throws IOException { groups.writeTo(out); aggregationConfig.writeTo(out); + out.writeOptionalInt(size); } public AggregationConfig getAggregationConfig() { @@ -117,6 +126,11 @@ public GroupConfig getGroupConfig() { return groups; } + @Nullable + public Integer getSize() { + return size; + } + @Override public boolean equals(Object other) { if (this == other) { @@ -129,12 +143,14 @@ public boolean equals(Object other) { final PivotConfig that = (PivotConfig) other; - return Objects.equals(this.groups, that.groups) && Objects.equals(this.aggregationConfig, that.aggregationConfig); + return Objects.equals(this.groups, that.groups) + && Objects.equals(this.aggregationConfig, that.aggregationConfig) + && Objects.equals(this.size, that.size); } @Override public int hashCode() { - return Objects.hash(groups, aggregationConfig); + return Objects.hash(groups, aggregationConfig, size); } public boolean isValid() { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/pivot/PivotConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/pivot/PivotConfigTests.java index 1586ea540f4b4..0435ce9ec0a4e 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/pivot/PivotConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/pivot/PivotConfigTests.java @@ -17,11 +17,15 @@ public class PivotConfigTests extends AbstractSerializingDataFrameTestCase { public static PivotConfig randomPivotConfig() { - return new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomAggregationConfig()); + return new PivotConfig(GroupConfigTests.randomGroupConfig(), + AggregationConfigTests.randomAggregationConfig(), + randomBoolean() ? null : randomIntBetween(10, 10_000)); } public static PivotConfig randomInvalidPivotConfig() { - return new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomInvalidAggregationConfig()); + return new PivotConfig(GroupConfigTests.randomGroupConfig(), + AggregationConfigTests.randomInvalidAggregationConfig(), + randomBoolean() ? null : randomIntBetween(10, 10_000)); } @Override diff --git a/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java b/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java index 84f3e05de5cd1..2dd116dfd6668 100644 --- a/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java +++ b/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java @@ -172,7 +172,13 @@ protected AggregationConfig createAggConfig(AggregatorFactories.Builder aggregat protected PivotConfig createPivotConfig(Map groups, AggregatorFactories.Builder aggregations) throws Exception { - return new PivotConfig(createGroupConfig(groups), createAggConfig(aggregations)); + return createPivotConfig(groups, aggregations, null); + } + + protected PivotConfig createPivotConfig(Map groups, + AggregatorFactories.Builder aggregations, + Integer size) throws Exception { + return new PivotConfig(createGroupConfig(groups), createAggConfig(aggregations), size); } protected DataFrameTransformConfig createTransformConfig(String id, diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java index 0e5231442d18b..d33d960620db0 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java @@ -76,13 +76,15 @@ public void deduceMappings(Client client, SourceConfig sourceConfig, final Actio * the page size, the type of aggregations and the data. As the page size is the number of buckets we return * per page the page size is a multiplier for the costs of aggregating bucket. * - * Initially this returns a default, in future it might inspect the configuration and base the initial size - * on the aggregations used. + * The user may set a maximum in the {@link PivotConfig#getSize()}, but if that is not provided, + * the default {@link Pivot#DEFAULT_INITIAL_PAGE_SIZE} is used. + * + * In future we might inspect the configuration and base the initial size on the aggregations used. * * @return the page size */ public int getInitialPageSize() { - return DEFAULT_INITIAL_PAGE_SIZE; + return config.getSize() == null ? DEFAULT_INITIAL_PAGE_SIZE : config.getSize(); } public SearchRequest buildSearchRequest(SourceConfig sourceConfig, Map position, int pageSize) { diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java index f3f3255f07a6d..43198c6edfcf3 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java @@ -23,7 +23,9 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; -import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests; +import org.elasticsearch.xpack.core.dataframe.transforms.pivot.AggregationConfigTests; +import org.elasticsearch.xpack.core.dataframe.transforms.pivot.GroupConfigTests; +import org.elasticsearch.xpack.core.dataframe.transforms.pivot.PivotConfig; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor; import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot; @@ -39,7 +41,10 @@ import java.util.function.Consumer; import java.util.function.Function; +import static org.elasticsearch.xpack.core.dataframe.transforms.DestConfigTests.randomDestConfig; +import static org.elasticsearch.xpack.core.dataframe.transforms.SourceConfigTests.randomSourceConfig; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -169,9 +174,15 @@ public void setUpMocks() { } public void testPageSizeAdapt() throws InterruptedException { - DataFrameTransformConfig config = DataFrameTransformConfigTests.randomDataFrameTransformConfig(); + Integer pageSize = randomBoolean() ? null : randomIntBetween(500, 10_000); + DataFrameTransformConfig config = new DataFrameTransformConfig(randomAlphaOfLength(10), + randomSourceConfig(), + randomDestConfig(), + null, + new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomAggregationConfig(), pageSize), + randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000)); AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); - + final long initialPageSize = pageSize == null ? Pivot.DEFAULT_INITIAL_PAGE_SIZE : pageSize; Function searchFunction = searchRequest -> { throw new SearchPhaseExecutionException("query", "Partial shards failure", new ShardSearchFailure[] { new ShardSearchFailure(new CircuitBreakingException("to much memory", 110, 100, Durability.TRANSIENT)) }); @@ -179,9 +190,7 @@ public void testPageSizeAdapt() throws InterruptedException { Function bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100); - Consumer failureConsumer = e -> { - fail("expected circuit breaker exception to be handled"); - }; + Consumer failureConsumer = e -> fail("expected circuit breaker exception to be handled"); final ExecutorService executor = Executors.newFixedThreadPool(1); try { @@ -197,8 +206,8 @@ public void testPageSizeAdapt() throws InterruptedException { latch.countDown(); awaitBusy(() -> indexer.getState() == IndexerState.STOPPED); long pageSizeAfterFirstReduction = indexer.getPageSize(); - assertTrue(Pivot.DEFAULT_INITIAL_PAGE_SIZE > pageSizeAfterFirstReduction); - assertTrue(pageSizeAfterFirstReduction > DataFrameIndexer.MINIMUM_PAGE_SIZE); + assertThat(initialPageSize, greaterThan(pageSizeAfterFirstReduction)); + assertThat(pageSizeAfterFirstReduction, greaterThan((long)DataFrameIndexer.MINIMUM_PAGE_SIZE)); // run indexer a 2nd time final CountDownLatch secondRunLatch = indexer.newLatch(1); @@ -211,8 +220,8 @@ public void testPageSizeAdapt() throws InterruptedException { awaitBusy(() -> indexer.getState() == IndexerState.STOPPED); // assert that page size has been reduced again - assertTrue(pageSizeAfterFirstReduction > indexer.getPageSize()); - assertTrue(pageSizeAfterFirstReduction > DataFrameIndexer.MINIMUM_PAGE_SIZE); + assertThat(pageSizeAfterFirstReduction, greaterThan((long)indexer.getPageSize())); + assertThat(pageSizeAfterFirstReduction, greaterThan((long)DataFrameIndexer.MINIMUM_PAGE_SIZE)); } finally { executor.shutdownNow(); diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/PivotTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/PivotTests.java index 4c434cdbee7fd..20ea84502ed82 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/PivotTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/PivotTests.java @@ -107,6 +107,16 @@ public void testValidateNonExistingIndex() throws Exception { assertInvalidTransform(client, source, pivot); } + public void testInitialPageSize() throws Exception { + int expectedPageSize = 1000; + + Pivot pivot = new Pivot(new PivotConfig(GroupConfigTests.randomGroupConfig(), getValidAggregationConfig(), expectedPageSize)); + assertThat(pivot.getInitialPageSize(), equalTo(expectedPageSize)); + + pivot = new Pivot(new PivotConfig(GroupConfigTests.randomGroupConfig(), getValidAggregationConfig(), null)); + assertThat(pivot.getInitialPageSize(), equalTo(Pivot.DEFAULT_INITIAL_PAGE_SIZE)); + } + public void testSearchFailure() throws Exception { // test a failure during the search operation, transform creation fails if // search has failures although they might just be temporary @@ -177,11 +187,11 @@ protected void } private PivotConfig getValidPivotConfig() throws IOException { - return new PivotConfig(GroupConfigTests.randomGroupConfig(), getValidAggregationConfig()); + return new PivotConfig(GroupConfigTests.randomGroupConfig(), getValidAggregationConfig(), null); } private PivotConfig getValidPivotConfig(AggregationConfig aggregationConfig) throws IOException { - return new PivotConfig(GroupConfigTests.randomGroupConfig(), aggregationConfig); + return new PivotConfig(GroupConfigTests.randomGroupConfig(), aggregationConfig, null); } private AggregationConfig getValidAggregationConfig() throws IOException { diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml index fa608cefd1eb3..a6330dad8b12e 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml @@ -302,3 +302,33 @@ setup: "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} } } +--- +"Test put config with invalid pivot size": + - do: + catch: /pivot\.size \[5\] must be greater than 10 and less than 10,000/ + data_frame.put_data_frame_transform: + transform_id: "airline-transform" + body: > + { + "source": { "index": "airline-data" }, + "dest": { "index": "airline-dest-index" }, + "pivot": { + "size": 5, + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + } + } + - do: + catch: /pivot\.size \[15000\] must be greater than 10 and less than 10,000/ + data_frame.put_data_frame_transform: + transform_id: "airline-transform" + body: > + { + "source": { "index": "airline-data" }, + "dest": { "index": "airline-dest-index" }, + "pivot": { + "size": 15000, + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + } + } From 472f2c34bdeb2d49d256213c13aee30534b7d530 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Wed, 8 May 2019 16:16:07 -0500 Subject: [PATCH 2/4] Changing field name to address PR comments --- .../xpack/core/dataframe/DataFrameField.java | 2 +- .../action/PutDataFrameTransformAction.java | 7 +++--- .../transforms/pivot/PivotConfig.java | 24 +++++++++---------- .../dataframe/transforms/pivot/Pivot.java | 4 ++-- .../test/data_frame/transforms_crud.yml | 8 +++---- 5 files changed, 23 insertions(+), 22 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java index 40ee0c8a7292d..c61ed2ddde8be 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java @@ -27,7 +27,7 @@ public final class DataFrameField { public static final ParseField SOURCE = new ParseField("source"); public static final ParseField DESTINATION = new ParseField("dest"); public static final ParseField FORCE = new ParseField("force"); - public static final ParseField SIZE = new ParseField("size"); + public static final ParseField MAX_PAGE_SEARCH_SIZE = new ParseField("max_page_search_size"); /** * Fields for checkpointing diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PutDataFrameTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PutDataFrameTransformAction.java index 6c6cfa7f1208d..7a0c2ca9e480d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PutDataFrameTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PutDataFrameTransformAction.java @@ -57,10 +57,11 @@ public static Request fromXContent(final XContentParser parser, final String id) public ActionRequestValidationException validate() { ActionRequestValidationException validationException = null; if(config.getPivotConfig() != null - && config.getPivotConfig().getSize() != null - && (config.getPivotConfig().getSize() < 10 || config.getPivotConfig().getSize() > 10_000)) { + && config.getPivotConfig().getMaxPageSearchSize() != null + && (config.getPivotConfig().getMaxPageSearchSize() < 10 || config.getPivotConfig().getMaxPageSearchSize() > 10_000)) { validationException = addValidationError( - "pivot.size [" + config.getPivotConfig().getSize() + "] must be greater than 10 and less than 10,000", + "pivot.max_page_search_size [" + + config.getPivotConfig().getMaxPageSearchSize() + "] must be greater than 10 and less than 10,000", validationException); } return validationException; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/pivot/PivotConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/pivot/PivotConfig.java index e016a54c8d0ee..36f53988ec1cc 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/pivot/PivotConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/pivot/PivotConfig.java @@ -30,7 +30,7 @@ public class PivotConfig implements Writeable, ToXContentObject { private static final String NAME = "data_frame_transform_pivot"; private final GroupConfig groups; private final AggregationConfig aggregationConfig; - private final Integer size; + private final Integer maxPageSearchSize; private static final ConstructingObjectParser STRICT_PARSER = createParser(false); private static final ConstructingObjectParser LENIENT_PARSER = createParser(true); @@ -65,21 +65,21 @@ private static ConstructingObjectParser createParser(boolean parser.declareObject(optionalConstructorArg(), (p, c) -> AggregationConfig.fromXContent(p, lenient), DataFrameField.AGGREGATIONS); parser.declareObject(optionalConstructorArg(), (p, c) -> AggregationConfig.fromXContent(p, lenient), DataFrameField.AGGS); - parser.declareInt(optionalConstructorArg(), DataFrameField.SIZE); + parser.declareInt(optionalConstructorArg(), DataFrameField.MAX_PAGE_SEARCH_SIZE); return parser; } - public PivotConfig(final GroupConfig groups, final AggregationConfig aggregationConfig, Integer size) { + public PivotConfig(final GroupConfig groups, final AggregationConfig aggregationConfig, Integer maxPageSearchSize) { this.groups = ExceptionsHelper.requireNonNull(groups, DataFrameField.GROUP_BY.getPreferredName()); this.aggregationConfig = ExceptionsHelper.requireNonNull(aggregationConfig, DataFrameField.AGGREGATIONS.getPreferredName()); - this.size = size; + this.maxPageSearchSize = maxPageSearchSize; } public PivotConfig(StreamInput in) throws IOException { this.groups = new GroupConfig(in); this.aggregationConfig = new AggregationConfig(in); - this.size = in.readOptionalInt(); + this.maxPageSearchSize = in.readOptionalInt(); } @Override @@ -87,8 +87,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject(); builder.field(DataFrameField.GROUP_BY.getPreferredName(), groups); builder.field(DataFrameField.AGGREGATIONS.getPreferredName(), aggregationConfig); - if (size != null) { - builder.field(DataFrameField.SIZE.getPreferredName(), size); + if (maxPageSearchSize != null) { + builder.field(DataFrameField.MAX_PAGE_SEARCH_SIZE.getPreferredName(), maxPageSearchSize); } builder.endObject(); return builder; @@ -115,7 +115,7 @@ public void toCompositeAggXContent(XContentBuilder builder, Params params) throw public void writeTo(StreamOutput out) throws IOException { groups.writeTo(out); aggregationConfig.writeTo(out); - out.writeOptionalInt(size); + out.writeOptionalInt(maxPageSearchSize); } public AggregationConfig getAggregationConfig() { @@ -127,8 +127,8 @@ public GroupConfig getGroupConfig() { } @Nullable - public Integer getSize() { - return size; + public Integer getMaxPageSearchSize() { + return maxPageSearchSize; } @Override @@ -145,12 +145,12 @@ public boolean equals(Object other) { return Objects.equals(this.groups, that.groups) && Objects.equals(this.aggregationConfig, that.aggregationConfig) - && Objects.equals(this.size, that.size); + && Objects.equals(this.maxPageSearchSize, that.maxPageSearchSize); } @Override public int hashCode() { - return Objects.hash(groups, aggregationConfig, size); + return Objects.hash(groups, aggregationConfig, maxPageSearchSize); } public boolean isValid() { diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java index d33d960620db0..8205f2576da68 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java @@ -76,7 +76,7 @@ public void deduceMappings(Client client, SourceConfig sourceConfig, final Actio * the page size, the type of aggregations and the data. As the page size is the number of buckets we return * per page the page size is a multiplier for the costs of aggregating bucket. * - * The user may set a maximum in the {@link PivotConfig#getSize()}, but if that is not provided, + * The user may set a maximum in the {@link PivotConfig#getMaxPageSearchSize()}, but if that is not provided, * the default {@link Pivot#DEFAULT_INITIAL_PAGE_SIZE} is used. * * In future we might inspect the configuration and base the initial size on the aggregations used. @@ -84,7 +84,7 @@ public void deduceMappings(Client client, SourceConfig sourceConfig, final Actio * @return the page size */ public int getInitialPageSize() { - return config.getSize() == null ? DEFAULT_INITIAL_PAGE_SIZE : config.getSize(); + return config.getMaxPageSearchSize() == null ? DEFAULT_INITIAL_PAGE_SIZE : config.getMaxPageSearchSize(); } public SearchRequest buildSearchRequest(SourceConfig sourceConfig, Map position, int pageSize) { diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml index a6330dad8b12e..718365589eb22 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml @@ -305,7 +305,7 @@ setup: --- "Test put config with invalid pivot size": - do: - catch: /pivot\.size \[5\] must be greater than 10 and less than 10,000/ + catch: /pivot\.max_page_search_size \[5\] must be greater than 10 and less than 10,000/ data_frame.put_data_frame_transform: transform_id: "airline-transform" body: > @@ -313,13 +313,13 @@ setup: "source": { "index": "airline-data" }, "dest": { "index": "airline-dest-index" }, "pivot": { - "size": 5, + "max_page_search_size": 5, "group_by": { "airline": {"terms": {"field": "airline"}}}, "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} } } - do: - catch: /pivot\.size \[15000\] must be greater than 10 and less than 10,000/ + catch: /pivot\.max_page_search_size \[15000\] must be greater than 10 and less than 10,000/ data_frame.put_data_frame_transform: transform_id: "airline-transform" body: > @@ -327,7 +327,7 @@ setup: "source": { "index": "airline-data" }, "dest": { "index": "airline-dest-index" }, "pivot": { - "size": 15000, + "max_page_search_size": 15000, "group_by": { "airline": {"terms": {"field": "airline"}}}, "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} } From d06a5b0cc14e3d7f889a3902fcf2a770e3fcabc4 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Wed, 8 May 2019 16:32:35 -0500 Subject: [PATCH 3/4] fixing ctor usage --- .../dataframe/integration/DataFrameTransformProgressIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformProgressIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformProgressIT.java index d338d6949f07b..194d35e8ba636 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformProgressIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformProgressIT.java @@ -130,7 +130,7 @@ public void testGetProgress() throws Exception { AggregatorFactories.Builder aggs = new AggregatorFactories.Builder(); aggs.addAggregator(AggregationBuilders.avg("avg_rating").field("stars")); AggregationConfig aggregationConfig = new AggregationConfig(Collections.emptyMap(), aggs); - PivotConfig pivotConfig = new PivotConfig(histgramGroupConfig, aggregationConfig); + PivotConfig pivotConfig = new PivotConfig(histgramGroupConfig, aggregationConfig, null); DataFrameTransformConfig config = new DataFrameTransformConfig("get_progress_transform", sourceConfig, destConfig, @@ -149,7 +149,7 @@ public void testGetProgress() throws Exception { QueryConfig queryConfig = new QueryConfig(Collections.emptyMap(), QueryBuilders.termQuery("user_id", "user_26")); - pivotConfig = new PivotConfig(histgramGroupConfig, aggregationConfig); + pivotConfig = new PivotConfig(histgramGroupConfig, aggregationConfig, null); sourceConfig = new SourceConfig(new String[]{REVIEWS_INDEX_NAME}, queryConfig); config = new DataFrameTransformConfig("get_progress_transform", sourceConfig, From 7f00700a79c626a73eb77da4f773fd9e00e193b7 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Thu, 9 May 2019 14:46:20 -0500 Subject: [PATCH 4/4] adjust hlrc for field name change --- .../transforms/pivot/PivotConfig.java | 38 +++++++++---------- .../DataFrameTransformDocumentationIT.java | 2 +- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/pivot/PivotConfig.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/pivot/PivotConfig.java index 75fbc9c81e2d8..6fdbeb8a43a20 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/pivot/PivotConfig.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/pivot/PivotConfig.java @@ -39,11 +39,11 @@ public class PivotConfig implements ToXContentObject { private static final ParseField GROUP_BY = new ParseField("group_by"); private static final ParseField AGGREGATIONS = new ParseField("aggregations"); - private static final ParseField SIZE = new ParseField("size"); + private static final ParseField MAX_PAGE_SEARCH_SIZE = new ParseField("max_page_search_size"); private final GroupConfig groups; private final AggregationConfig aggregationConfig; - private final Integer size; + private final Integer maxPageSearchSize; private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("pivot_config", true, args -> new PivotConfig((GroupConfig) args[0], (AggregationConfig) args[1], (Integer) args[2])); @@ -51,17 +51,17 @@ public class PivotConfig implements ToXContentObject { static { PARSER.declareObject(constructorArg(), (p, c) -> (GroupConfig.fromXContent(p)), GROUP_BY); PARSER.declareObject(optionalConstructorArg(), (p, c) -> AggregationConfig.fromXContent(p), AGGREGATIONS); - PARSER.declareInt(optionalConstructorArg(), SIZE); + PARSER.declareInt(optionalConstructorArg(), MAX_PAGE_SEARCH_SIZE); } public static PivotConfig fromXContent(final XContentParser parser) { return PARSER.apply(parser, null); } - PivotConfig(GroupConfig groups, final AggregationConfig aggregationConfig, Integer size) { + PivotConfig(GroupConfig groups, final AggregationConfig aggregationConfig, Integer maxPageSearchSize) { this.groups = groups; this.aggregationConfig = aggregationConfig; - this.size = size; + this.maxPageSearchSize = maxPageSearchSize; } @Override @@ -69,8 +69,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject(); builder.field(GROUP_BY.getPreferredName(), groups); builder.field(AGGREGATIONS.getPreferredName(), aggregationConfig); - if (size != null) { - builder.field(SIZE.getPreferredName(), size); + if (maxPageSearchSize != null) { + builder.field(MAX_PAGE_SEARCH_SIZE.getPreferredName(), maxPageSearchSize); } builder.endObject(); return builder; @@ -84,8 +84,8 @@ public GroupConfig getGroupConfig() { return groups; } - public Integer getSize() { - return size; + public Integer getMaxPageSearchSize() { + return maxPageSearchSize; } @Override @@ -102,12 +102,12 @@ public boolean equals(Object other) { return Objects.equals(this.groups, that.groups) && Objects.equals(this.aggregationConfig, that.aggregationConfig) - && Objects.equals(this.size, that.size); + && Objects.equals(this.maxPageSearchSize, that.maxPageSearchSize); } @Override public int hashCode() { - return Objects.hash(groups, aggregationConfig, size); + return Objects.hash(groups, aggregationConfig, maxPageSearchSize); } public static Builder builder() { @@ -117,7 +117,7 @@ public static Builder builder() { public static class Builder { private GroupConfig groups; private AggregationConfig aggregationConfig; - private Integer size; + private Integer maxPageSearchSize; /** * Set how to group the source data @@ -150,21 +150,21 @@ public Builder setAggregations(AggregatorFactories.Builder aggregations) { } /** - * Sets the paging maximum paging size that date frame transform can use when + * Sets the paging maximum paging maxPageSearchSize that date frame transform can use when * pulling the data from the source index. * - * If OOM is triggered, the paging size is dynamically reduced so that the transform can continue to gather data. + * If OOM is triggered, the paging maxPageSearchSize is dynamically reduced so that the transform can continue to gather data. * - * @param size Integer value between 10 and 10_000 - * @return the {@link Builder} with the paging size set. + * @param maxPageSearchSize Integer value between 10 and 10_000 + * @return the {@link Builder} with the paging maxPageSearchSize set. */ - public Builder setSize(Integer size) { - this.size = size; + public Builder setMaxPageSearchSize(Integer maxPageSearchSize) { + this.maxPageSearchSize = maxPageSearchSize; return this; } public PivotConfig build() { - return new PivotConfig(groups, aggregationConfig, size); + return new PivotConfig(groups, aggregationConfig, maxPageSearchSize); } } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java index f541689922819..4bd78f12ae4ce 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java @@ -139,7 +139,7 @@ public void testPutDataFrameTransform() throws IOException, InterruptedException PivotConfig pivotConfig = PivotConfig.builder() .setGroups(groupConfig) // <1> .setAggregationConfig(aggConfig) // <2> - .setSize(1000) // <3> + .setMaxPageSearchSize(1000) // <3> .build(); // end::put-data-frame-transform-pivot-config // tag::put-data-frame-transform-config