Skip to content

Commit

Permalink
Update AggregatorFactory to provide a method to indicate if it suppor…
Browse files Browse the repository at this point in the history
…ts concurrent search (#9469) (#9550)

* Parameterize parent-join search tests and add coverage for sparse slice case



* Separate request-based and settings-based concurrent segment search controls and introduce AggregatorFactory method to determine concurrent search support



* Add supportsConcurrentSegmentSearch override for all AggregatorFactory concrete classes



* Addressing feedback



---------

Signed-off-by: Jay Deng <[email protected]>
  • Loading branch information
jed326 authored Aug 28, 2023
1 parent a137cb9 commit afdd5f7
Show file tree
Hide file tree
Showing 64 changed files with 466 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Refactor Compressors from CompressorFactory to CompressorRegistry for extensibility ([#9262](https://github.com/opensearch-project/OpenSearch/pull/9262))
- Fix sort related ITs for concurrent search ([#9177](https://github.com/opensearch-project/OpenSearch/pull/9466)
- [Remote Store] Implicitly use replication type SEGMENT for remote store clusters ([#9264](https://github.com/opensearch-project/OpenSearch/pull/9264))
- Separate request-based and settings-based concurrent segment search controls and introduce AggregatorFactory method to determine concurrent search support ([#9469](https://github.com/opensearch-project/OpenSearch/pull/9469))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,9 @@ protected Aggregator doCreateInternal(
}
return new MatrixStatsAggregator(name, typedValuesSources, searchContext, parent, multiValueMode, metadata);
}

@Override
protected boolean supportsConcurrentSegmentSearch() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,4 +196,9 @@ static void registerAggregators(ValuesSourceRegistry.Builder builder) {
true
);
}

@Override
protected boolean supportsConcurrentSegmentSearch() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,4 +194,9 @@ static void registerAggregators(ValuesSourceRegistry.Builder builder) {
true
);
}

@Override
protected boolean supportsConcurrentSegmentSearch() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,9 @@ static void registerAggregators(ValuesSourceRegistry.Builder builder) {
builder.register(GeoBoundsAggregationBuilder.REGISTRY_KEY, CoreValuesSourceType.GEOPOINT, GeoBoundsAggregator::new, true);
builder.register(GeoBoundsAggregationBuilder.REGISTRY_KEY, CoreValuesSourceType.GEO_SHAPE, GeoBoundsGeoShapeAggregator::new, true);
}

@Override
protected boolean supportsConcurrentSegmentSearch() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@
package org.opensearch.join.aggregations;

import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.search.SearchRequestBuilder;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Requests;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.join.query.ParentChildTestCase;
import org.junit.Before;

Expand All @@ -44,6 +49,7 @@
import java.util.Set;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;

/**
* Small base test-class which combines stuff used for Children and Parent aggregation tests
Expand All @@ -52,6 +58,10 @@ public abstract class AbstractParentChildTestCase extends ParentChildTestCase {
protected final Map<String, Control> categoryToControl = new HashMap<>();
protected final Map<String, ParentControl> articleToControl = new HashMap<>();

public AbstractParentChildTestCase(Settings dynamicSettings) {
super(dynamicSettings);
}

@Before
public void setupCluster() throws Exception {
assertAcked(
Expand Down Expand Up @@ -154,4 +164,38 @@ private ParentControl(String category) {
this.category = category;
}
}

// Test when there is 1 child document and 1 parent document per segment.
public void testSparseSegments() throws InterruptedException {
assertAcked(
prepareCreate("sparse").setMapping(
addFieldMappings(
buildParentJoinFieldMappingFromSimplifiedDef("join_field", true, "article", "comment"),
"commenter",
"keyword",
"category",
"keyword"
)
)
.setSettings(
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
)
);

List<IndexRequestBuilder> requests = new ArrayList<>();
requests.add(createIndexRequest("sparse", "article", "article-0", null, "category", List.of("0")));
indexRandom(true, false, requests);
client().admin().indices().refresh(Requests.refreshRequest("sparse")).actionGet();
requests = new ArrayList<>();
requests.add(createIndexRequest("sparse", "comment", "comment-0", "article-0", "commenter", "0"));
indexRandom(true, false, requests);

SearchResponse searchResponse = getSearchRequest().get();
assertSearchResponse(searchResponse);
validateSpareSegmentsSearchResponse(searchResponse);
}

abstract SearchRequestBuilder getSearchRequest();

abstract void validateSpareSegmentsSearchResponse(SearchResponse searchResponse);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,17 @@

package org.opensearch.join.aggregations;

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.apache.lucene.search.join.ScoreMode;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.search.SearchRequestBuilder;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Requests;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.search.SearchHit;
import org.opensearch.search.aggregations.AggregationBuilders;
import org.opensearch.search.aggregations.InternalAggregation;
Expand All @@ -47,14 +51,18 @@
import org.opensearch.search.sort.SortOrder;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
import static org.opensearch.index.query.QueryBuilders.matchQuery;
import static org.opensearch.index.query.QueryBuilders.termQuery;
import static org.opensearch.join.aggregations.JoinAggregationBuilders.children;
import static org.opensearch.join.query.JoinQueryBuilders.hasChildQuery;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.aggregations.AggregationBuilders.sum;
import static org.opensearch.search.aggregations.AggregationBuilders.terms;
import static org.opensearch.search.aggregations.AggregationBuilders.topHits;
Expand All @@ -69,6 +77,23 @@

public class ChildrenIT extends AbstractParentChildTestCase {

public ChildrenIT(Settings settings) {
super(settings);
}

@ParametersFactory
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }
);
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true").build();
}

public void testChildrenAggs() throws Exception {
SearchResponse searchResponse = client().prepareSearch("test")
.setQuery(matchQuery("randomized", true))
Expand Down Expand Up @@ -407,4 +432,18 @@ public void testPostCollectAllLeafReaders() throws Exception {
children = parents.getBuckets().get(0).getAggregations().get("child_docs");
assertThat(children.getDocCount(), equalTo(2L));
}

@Override
SearchRequestBuilder getSearchRequest() {
return client().prepareSearch("sparse")
.setSize(10000)
.setQuery(matchAllQuery())
.addAggregation(children("to_comment", "comment").subAggregation(terms("commenters").field("commenter").size(10000)));
}

@Override
void validateSpareSegmentsSearchResponse(SearchResponse searchResponse) {
Children children = searchResponse.getAggregations().get("to_comment");
assertEquals(children.getDocCount(), 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,18 @@

package org.opensearch.join.aggregations;

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.opensearch.action.search.SearchRequestBuilder;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.search.aggregations.Aggregation;
import org.opensearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.opensearch.search.aggregations.bucket.terms.Terms;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -47,15 +53,34 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
import static org.opensearch.index.query.QueryBuilders.matchQuery;
import static org.opensearch.join.aggregations.JoinAggregationBuilders.parent;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.aggregations.AggregationBuilders.terms;
import static org.opensearch.search.aggregations.AggregationBuilders.topHits;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.equalTo;

public class ParentIT extends AbstractParentChildTestCase {

public ParentIT(Settings settings) {
super(settings);
}

@ParametersFactory
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }
);
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true").build();
}

public void testSimpleParentAgg() throws Exception {
final SearchRequestBuilder searchRequest = client().prepareSearch("test")
.setSize(10000)
Expand Down Expand Up @@ -264,4 +289,18 @@ public void testTermsParentAggTerms() throws Exception {
}
}
}

@Override
SearchRequestBuilder getSearchRequest() {
return client().prepareSearch("sparse")
.setSize(10000)
.setQuery(matchAllQuery())
.addAggregation(parent("to_article", "comment").subAggregation(terms("category").field("category").size(10000)));
}

@Override
void validateSpareSegmentsSearchResponse(SearchResponse searchResponse) {
Parent parentAgg = searchResponse.getAggregations().get("to_article");
assertEquals(parentAgg.getDocCount(), 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@

package org.opensearch.join.query;

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.apache.lucene.search.join.ScoreMode;
import org.opensearch.action.explain.ExplainResponse;
import org.opensearch.action.index.IndexRequestBuilder;
Expand All @@ -42,6 +44,7 @@
import org.opensearch.common.lucene.search.function.FunctionScoreQuery;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.IdsQueryBuilder;
Expand All @@ -65,6 +68,8 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -87,6 +92,7 @@
import static org.opensearch.join.query.JoinQueryBuilders.hasChildQuery;
import static org.opensearch.join.query.JoinQueryBuilders.hasParentQuery;
import static org.opensearch.join.query.JoinQueryBuilders.parentId;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures;
Expand All @@ -100,6 +106,23 @@

public class ChildQuerySearchIT extends ParentChildTestCase {

public ChildQuerySearchIT(Settings settings) {
super(settings);
}

@ParametersFactory
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }
);
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true").build();
}

public void testMultiLevelChild() throws Exception {
assertAcked(
prepareCreate("test").setMapping(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,15 @@

package org.opensearch.join.query;

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.apache.lucene.search.join.ScoreMode;
import org.apache.lucene.util.ArrayUtil;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.search.SearchPhaseExecutionException;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.InnerHitBuilder;
Expand All @@ -54,6 +58,7 @@
import org.opensearch.search.sort.SortOrder;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand All @@ -73,6 +78,7 @@
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.opensearch.join.query.JoinQueryBuilders.hasChildQuery;
import static org.opensearch.join.query.JoinQueryBuilders.hasParentQuery;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures;
Expand All @@ -87,6 +93,23 @@

public class InnerHitsIT extends ParentChildTestCase {

public InnerHitsIT(Settings settings) {
super(settings);
}

@ParametersFactory
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }
);
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true").build();
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
ArrayList<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
Expand Down
Loading

0 comments on commit afdd5f7

Please sign in to comment.