diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/pivot/PivotConfig.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/pivot/PivotConfig.java index 0c3a6e3ea890b..6fdbeb8a43a20 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/pivot/PivotConfig.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/pivot/PivotConfig.java @@ -39,25 +39,29 @@ public class PivotConfig implements ToXContentObject { private static final ParseField GROUP_BY = new ParseField("group_by"); private static final ParseField AGGREGATIONS = new ParseField("aggregations"); + private static final ParseField MAX_PAGE_SEARCH_SIZE = new ParseField("max_page_search_size"); private final GroupConfig groups; private final AggregationConfig aggregationConfig; + private final Integer maxPageSearchSize; private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("pivot_config", true, - args -> new PivotConfig((GroupConfig) args[0], (AggregationConfig) args[1])); + args -> new PivotConfig((GroupConfig) args[0], (AggregationConfig) args[1], (Integer) args[2])); static { PARSER.declareObject(constructorArg(), (p, c) -> (GroupConfig.fromXContent(p)), GROUP_BY); PARSER.declareObject(optionalConstructorArg(), (p, c) -> AggregationConfig.fromXContent(p), AGGREGATIONS); + PARSER.declareInt(optionalConstructorArg(), MAX_PAGE_SEARCH_SIZE); } public static PivotConfig fromXContent(final XContentParser parser) { return PARSER.apply(parser, null); } - PivotConfig(GroupConfig groups, final AggregationConfig aggregationConfig) { + PivotConfig(GroupConfig groups, final AggregationConfig aggregationConfig, Integer maxPageSearchSize) { this.groups = groups; this.aggregationConfig = aggregationConfig; + this.maxPageSearchSize = maxPageSearchSize; } @Override @@ -65,6 +69,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject(); builder.field(GROUP_BY.getPreferredName(), groups); builder.field(AGGREGATIONS.getPreferredName(), aggregationConfig); + if (maxPageSearchSize != null) { + builder.field(MAX_PAGE_SEARCH_SIZE.getPreferredName(), maxPageSearchSize); + } builder.endObject(); return builder; } @@ -77,6 +84,10 @@ public GroupConfig getGroupConfig() { return groups; } + public Integer getMaxPageSearchSize() { + return maxPageSearchSize; + } + @Override public boolean equals(Object other) { if (this == other) { @@ -89,12 +100,14 @@ public boolean equals(Object other) { final PivotConfig that = (PivotConfig) other; - return Objects.equals(this.groups, that.groups) && Objects.equals(this.aggregationConfig, that.aggregationConfig); + return Objects.equals(this.groups, that.groups) + && Objects.equals(this.aggregationConfig, that.aggregationConfig) + && Objects.equals(this.maxPageSearchSize, that.maxPageSearchSize); } @Override public int hashCode() { - return Objects.hash(groups, aggregationConfig); + return Objects.hash(groups, aggregationConfig, maxPageSearchSize); } public static Builder builder() { @@ -104,6 +117,7 @@ public static Builder builder() { public static class Builder { private GroupConfig groups; private AggregationConfig aggregationConfig; + private Integer maxPageSearchSize; /** * Set how to group the source data @@ -135,8 +149,22 @@ public Builder setAggregations(AggregatorFactories.Builder aggregations) { return this; } + /** + * Sets the paging maximum paging maxPageSearchSize that date frame transform can use when + * pulling the data from the source index. + * + * If OOM is triggered, the paging maxPageSearchSize is dynamically reduced so that the transform can continue to gather data. + * + * @param maxPageSearchSize Integer value between 10 and 10_000 + * @return the {@link Builder} with the paging maxPageSearchSize set. + */ + public Builder setMaxPageSearchSize(Integer maxPageSearchSize) { + this.maxPageSearchSize = maxPageSearchSize; + return this; + } + public PivotConfig build() { - return new PivotConfig(groups, aggregationConfig); + return new PivotConfig(groups, aggregationConfig, maxPageSearchSize); } } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/pivot/PivotConfigTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/pivot/PivotConfigTests.java index d2e036d9f1ad2..5cafcb9f419b5 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/pivot/PivotConfigTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/pivot/PivotConfigTests.java @@ -32,7 +32,9 @@ public class PivotConfigTests extends AbstractXContentTestCase { public static PivotConfig randomPivotConfig() { - return new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomAggregationConfig()); + return new PivotConfig(GroupConfigTests.randomGroupConfig(), + AggregationConfigTests.randomAggregationConfig(), + randomBoolean() ? null : randomIntBetween(10, 10_000)); } @Override diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java index 07713d5371460..6f7832cbf3cff 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java @@ -138,8 +138,9 @@ public void testPutDataFrameTransform() throws IOException, InterruptedException // end::put-data-frame-transform-agg-config // tag::put-data-frame-transform-pivot-config PivotConfig pivotConfig = PivotConfig.builder() - .setGroups(groupConfig) - .setAggregationConfig(aggConfig) + .setGroups(groupConfig) // <1> + .setAggregationConfig(aggConfig) // <2> + .setMaxPageSearchSize(1000) // <3> .build(); // end::put-data-frame-transform-pivot-config // tag::put-data-frame-transform-config diff --git a/docs/java-rest/high-level/dataframe/put_data_frame.asciidoc b/docs/java-rest/high-level/dataframe/put_data_frame.asciidoc index bb1b20aaa1a52..567449c9c25b1 100644 --- a/docs/java-rest/high-level/dataframe/put_data_frame.asciidoc +++ b/docs/java-rest/high-level/dataframe/put_data_frame.asciidoc @@ -66,6 +66,11 @@ Defines the pivot function `group by` fields and the aggregation to reduce the d -------------------------------------------------- include-tagged::{doc-tests-file}[{api}-pivot-config] -------------------------------------------------- +<1> The `GroupConfig` to use in the pivot +<2> The aggregations to use +<3> The maximum paging size for the transform when pulling data +from the source. The size dynamically adjusts as the transform +is running to recover from and prevent OOM issues. ===== GroupConfig The grouping terms. Defines the group by and destination fields diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java index 71bf14cdeb4a5..c61ed2ddde8be 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java @@ -27,6 +27,7 @@ public final class DataFrameField { public static final ParseField SOURCE = new ParseField("source"); public static final ParseField DESTINATION = new ParseField("dest"); public static final ParseField FORCE = new ParseField("force"); + public static final ParseField MAX_PAGE_SEARCH_SIZE = new ParseField("max_page_search_size"); /** * Fields for checkpointing diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PutDataFrameTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PutDataFrameTransformAction.java index 059bad3494c07..2608fb87761f9 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PutDataFrameTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PutDataFrameTransformAction.java @@ -56,6 +56,14 @@ public static Request fromXContent(final XContentParser parser, final String id) @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = null; + if(config.getPivotConfig() != null + && config.getPivotConfig().getMaxPageSearchSize() != null + && (config.getPivotConfig().getMaxPageSearchSize() < 10 || config.getPivotConfig().getMaxPageSearchSize() > 10_000)) { + validationException = addValidationError( + "pivot.max_page_search_size [" + + config.getPivotConfig().getMaxPageSearchSize() + "] must be greater than 10 and less than 10,000", + validationException); + } for(String failure : config.getPivotConfig().aggFieldValidation()) { validationException = addValidationError(failure, validationException); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/pivot/PivotConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/pivot/PivotConfig.java index 79a0a7fc1bfa8..ab2f7d489ac9a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/pivot/PivotConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/pivot/PivotConfig.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.core.dataframe.transforms.pivot; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -35,6 +36,7 @@ public class PivotConfig implements Writeable, ToXContentObject { private static final String NAME = "data_frame_transform_pivot"; private final GroupConfig groups; private final AggregationConfig aggregationConfig; + private final Integer maxPageSearchSize; private static final ConstructingObjectParser STRICT_PARSER = createParser(false); private static final ConstructingObjectParser LENIENT_PARSER = createParser(true); @@ -61,7 +63,7 @@ private static ConstructingObjectParser createParser(boolean throw new IllegalArgumentException("Required [aggregations]"); } - return new PivotConfig(groups, aggregationConfig); + return new PivotConfig(groups, aggregationConfig, (Integer)args[3]); }); parser.declareObject(constructorArg(), @@ -69,18 +71,21 @@ private static ConstructingObjectParser createParser(boolean parser.declareObject(optionalConstructorArg(), (p, c) -> AggregationConfig.fromXContent(p, lenient), DataFrameField.AGGREGATIONS); parser.declareObject(optionalConstructorArg(), (p, c) -> AggregationConfig.fromXContent(p, lenient), DataFrameField.AGGS); + parser.declareInt(optionalConstructorArg(), DataFrameField.MAX_PAGE_SEARCH_SIZE); return parser; } - public PivotConfig(final GroupConfig groups, final AggregationConfig aggregationConfig) { + public PivotConfig(final GroupConfig groups, final AggregationConfig aggregationConfig, Integer maxPageSearchSize) { this.groups = ExceptionsHelper.requireNonNull(groups, DataFrameField.GROUP_BY.getPreferredName()); this.aggregationConfig = ExceptionsHelper.requireNonNull(aggregationConfig, DataFrameField.AGGREGATIONS.getPreferredName()); + this.maxPageSearchSize = maxPageSearchSize; } public PivotConfig(StreamInput in) throws IOException { this.groups = new GroupConfig(in); this.aggregationConfig = new AggregationConfig(in); + this.maxPageSearchSize = in.readOptionalInt(); } @Override @@ -88,6 +93,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject(); builder.field(DataFrameField.GROUP_BY.getPreferredName(), groups); builder.field(DataFrameField.AGGREGATIONS.getPreferredName(), aggregationConfig); + if (maxPageSearchSize != null) { + builder.field(DataFrameField.MAX_PAGE_SEARCH_SIZE.getPreferredName(), maxPageSearchSize); + } builder.endObject(); return builder; } @@ -113,6 +121,7 @@ public void toCompositeAggXContent(XContentBuilder builder, Params params) throw public void writeTo(StreamOutput out) throws IOException { groups.writeTo(out); aggregationConfig.writeTo(out); + out.writeOptionalInt(maxPageSearchSize); } public AggregationConfig getAggregationConfig() { @@ -123,6 +132,11 @@ public GroupConfig getGroupConfig() { return groups; } + @Nullable + public Integer getMaxPageSearchSize() { + return maxPageSearchSize; + } + @Override public boolean equals(Object other) { if (this == other) { @@ -135,12 +149,14 @@ public boolean equals(Object other) { final PivotConfig that = (PivotConfig) other; - return Objects.equals(this.groups, that.groups) && Objects.equals(this.aggregationConfig, that.aggregationConfig); + return Objects.equals(this.groups, that.groups) + && Objects.equals(this.aggregationConfig, that.aggregationConfig) + && Objects.equals(this.maxPageSearchSize, that.maxPageSearchSize); } @Override public int hashCode() { - return Objects.hash(groups, aggregationConfig); + return Objects.hash(groups, aggregationConfig, maxPageSearchSize); } public boolean isValid() { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/pivot/PivotConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/pivot/PivotConfigTests.java index 342e007f21284..2f93f50d4d136 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/pivot/PivotConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/pivot/PivotConfigTests.java @@ -24,11 +24,15 @@ public class PivotConfigTests extends AbstractSerializingDataFrameTestCase { public static PivotConfig randomPivotConfig() { - return new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomAggregationConfig()); + return new PivotConfig(GroupConfigTests.randomGroupConfig(), + AggregationConfigTests.randomAggregationConfig(), + randomBoolean() ? null : randomIntBetween(10, 10_000)); } public static PivotConfig randomInvalidPivotConfig() { - return new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomInvalidAggregationConfig()); + return new PivotConfig(GroupConfigTests.randomGroupConfig(), + AggregationConfigTests.randomInvalidAggregationConfig(), + randomBoolean() ? null : randomIntBetween(10, 10_000)); } @Override diff --git a/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java b/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java index ba6a6137789a3..3a6ab2e5b71d2 100644 --- a/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java +++ b/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java @@ -172,7 +172,13 @@ protected AggregationConfig createAggConfig(AggregatorFactories.Builder aggregat protected PivotConfig createPivotConfig(Map groups, AggregatorFactories.Builder aggregations) throws Exception { - return new PivotConfig(createGroupConfig(groups), createAggConfig(aggregations)); + return createPivotConfig(groups, aggregations, null); + } + + protected PivotConfig createPivotConfig(Map groups, + AggregatorFactories.Builder aggregations, + Integer size) throws Exception { + return new PivotConfig(createGroupConfig(groups), createAggConfig(aggregations), size); } protected DataFrameTransformConfig createTransformConfig(String id, diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformProgressIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformProgressIT.java index d338d6949f07b..194d35e8ba636 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformProgressIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformProgressIT.java @@ -130,7 +130,7 @@ public void testGetProgress() throws Exception { AggregatorFactories.Builder aggs = new AggregatorFactories.Builder(); aggs.addAggregator(AggregationBuilders.avg("avg_rating").field("stars")); AggregationConfig aggregationConfig = new AggregationConfig(Collections.emptyMap(), aggs); - PivotConfig pivotConfig = new PivotConfig(histgramGroupConfig, aggregationConfig); + PivotConfig pivotConfig = new PivotConfig(histgramGroupConfig, aggregationConfig, null); DataFrameTransformConfig config = new DataFrameTransformConfig("get_progress_transform", sourceConfig, destConfig, @@ -149,7 +149,7 @@ public void testGetProgress() throws Exception { QueryConfig queryConfig = new QueryConfig(Collections.emptyMap(), QueryBuilders.termQuery("user_id", "user_26")); - pivotConfig = new PivotConfig(histgramGroupConfig, aggregationConfig); + pivotConfig = new PivotConfig(histgramGroupConfig, aggregationConfig, null); sourceConfig = new SourceConfig(new String[]{REVIEWS_INDEX_NAME}, queryConfig); config = new DataFrameTransformConfig("get_progress_transform", sourceConfig, diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java index 0e5231442d18b..8205f2576da68 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java @@ -76,13 +76,15 @@ public void deduceMappings(Client client, SourceConfig sourceConfig, final Actio * the page size, the type of aggregations and the data. As the page size is the number of buckets we return * per page the page size is a multiplier for the costs of aggregating bucket. * - * Initially this returns a default, in future it might inspect the configuration and base the initial size - * on the aggregations used. + * The user may set a maximum in the {@link PivotConfig#getMaxPageSearchSize()}, but if that is not provided, + * the default {@link Pivot#DEFAULT_INITIAL_PAGE_SIZE} is used. + * + * In future we might inspect the configuration and base the initial size on the aggregations used. * * @return the page size */ public int getInitialPageSize() { - return DEFAULT_INITIAL_PAGE_SIZE; + return config.getMaxPageSearchSize() == null ? DEFAULT_INITIAL_PAGE_SIZE : config.getMaxPageSearchSize(); } public SearchRequest buildSearchRequest(SourceConfig sourceConfig, Map position, int pageSize) { diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java index f3f3255f07a6d..43198c6edfcf3 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java @@ -23,7 +23,9 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; -import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests; +import org.elasticsearch.xpack.core.dataframe.transforms.pivot.AggregationConfigTests; +import org.elasticsearch.xpack.core.dataframe.transforms.pivot.GroupConfigTests; +import org.elasticsearch.xpack.core.dataframe.transforms.pivot.PivotConfig; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor; import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot; @@ -39,7 +41,10 @@ import java.util.function.Consumer; import java.util.function.Function; +import static org.elasticsearch.xpack.core.dataframe.transforms.DestConfigTests.randomDestConfig; +import static org.elasticsearch.xpack.core.dataframe.transforms.SourceConfigTests.randomSourceConfig; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -169,9 +174,15 @@ public void setUpMocks() { } public void testPageSizeAdapt() throws InterruptedException { - DataFrameTransformConfig config = DataFrameTransformConfigTests.randomDataFrameTransformConfig(); + Integer pageSize = randomBoolean() ? null : randomIntBetween(500, 10_000); + DataFrameTransformConfig config = new DataFrameTransformConfig(randomAlphaOfLength(10), + randomSourceConfig(), + randomDestConfig(), + null, + new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomAggregationConfig(), pageSize), + randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000)); AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); - + final long initialPageSize = pageSize == null ? Pivot.DEFAULT_INITIAL_PAGE_SIZE : pageSize; Function searchFunction = searchRequest -> { throw new SearchPhaseExecutionException("query", "Partial shards failure", new ShardSearchFailure[] { new ShardSearchFailure(new CircuitBreakingException("to much memory", 110, 100, Durability.TRANSIENT)) }); @@ -179,9 +190,7 @@ public void testPageSizeAdapt() throws InterruptedException { Function bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100); - Consumer failureConsumer = e -> { - fail("expected circuit breaker exception to be handled"); - }; + Consumer failureConsumer = e -> fail("expected circuit breaker exception to be handled"); final ExecutorService executor = Executors.newFixedThreadPool(1); try { @@ -197,8 +206,8 @@ public void testPageSizeAdapt() throws InterruptedException { latch.countDown(); awaitBusy(() -> indexer.getState() == IndexerState.STOPPED); long pageSizeAfterFirstReduction = indexer.getPageSize(); - assertTrue(Pivot.DEFAULT_INITIAL_PAGE_SIZE > pageSizeAfterFirstReduction); - assertTrue(pageSizeAfterFirstReduction > DataFrameIndexer.MINIMUM_PAGE_SIZE); + assertThat(initialPageSize, greaterThan(pageSizeAfterFirstReduction)); + assertThat(pageSizeAfterFirstReduction, greaterThan((long)DataFrameIndexer.MINIMUM_PAGE_SIZE)); // run indexer a 2nd time final CountDownLatch secondRunLatch = indexer.newLatch(1); @@ -211,8 +220,8 @@ public void testPageSizeAdapt() throws InterruptedException { awaitBusy(() -> indexer.getState() == IndexerState.STOPPED); // assert that page size has been reduced again - assertTrue(pageSizeAfterFirstReduction > indexer.getPageSize()); - assertTrue(pageSizeAfterFirstReduction > DataFrameIndexer.MINIMUM_PAGE_SIZE); + assertThat(pageSizeAfterFirstReduction, greaterThan((long)indexer.getPageSize())); + assertThat(pageSizeAfterFirstReduction, greaterThan((long)DataFrameIndexer.MINIMUM_PAGE_SIZE)); } finally { executor.shutdownNow(); diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/PivotTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/PivotTests.java index 4c434cdbee7fd..20ea84502ed82 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/PivotTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/PivotTests.java @@ -107,6 +107,16 @@ public void testValidateNonExistingIndex() throws Exception { assertInvalidTransform(client, source, pivot); } + public void testInitialPageSize() throws Exception { + int expectedPageSize = 1000; + + Pivot pivot = new Pivot(new PivotConfig(GroupConfigTests.randomGroupConfig(), getValidAggregationConfig(), expectedPageSize)); + assertThat(pivot.getInitialPageSize(), equalTo(expectedPageSize)); + + pivot = new Pivot(new PivotConfig(GroupConfigTests.randomGroupConfig(), getValidAggregationConfig(), null)); + assertThat(pivot.getInitialPageSize(), equalTo(Pivot.DEFAULT_INITIAL_PAGE_SIZE)); + } + public void testSearchFailure() throws Exception { // test a failure during the search operation, transform creation fails if // search has failures although they might just be temporary @@ -177,11 +187,11 @@ protected void } private PivotConfig getValidPivotConfig() throws IOException { - return new PivotConfig(GroupConfigTests.randomGroupConfig(), getValidAggregationConfig()); + return new PivotConfig(GroupConfigTests.randomGroupConfig(), getValidAggregationConfig(), null); } private PivotConfig getValidPivotConfig(AggregationConfig aggregationConfig) throws IOException { - return new PivotConfig(GroupConfigTests.randomGroupConfig(), aggregationConfig); + return new PivotConfig(GroupConfigTests.randomGroupConfig(), aggregationConfig, null); } private AggregationConfig getValidAggregationConfig() throws IOException { diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml index 40af091a91bd9..65945b6ab7429 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml @@ -303,6 +303,36 @@ setup: } } --- +"Test put config with invalid pivot size": + - do: + catch: /pivot\.max_page_search_size \[5\] must be greater than 10 and less than 10,000/ + data_frame.put_data_frame_transform: + transform_id: "airline-transform" + body: > + { + "source": { "index": "airline-data" }, + "dest": { "index": "airline-dest-index" }, + "pivot": { + "max_page_search_size": 5, + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + } + } + - do: + catch: /pivot\.max_page_search_size \[15000\] must be greater than 10 and less than 10,000/ + data_frame.put_data_frame_transform: + transform_id: "airline-transform" + body: > + { + "source": { "index": "airline-data" }, + "dest": { "index": "airline-dest-index" }, + "pivot": { + "max_page_search_size": 15000, + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + } + } +--- "Test creation failures due to duplicate and conflicting field names": - do: catch: /duplicate field \[airline\] detected/