Skip to content

Commit

Permalink
pluggable deciders for concurrent search
Browse files Browse the repository at this point in the history
  • Loading branch information
Ganesh Ramadurai committed Aug 26, 2024
1 parent f195285 commit 708a158
Show file tree
Hide file tree
Showing 25 changed files with 936 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.List;

import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE;
import static org.opensearch.search.aggregations.AggregationBuilders.global;
import static org.opensearch.search.aggregations.AggregationBuilders.stats;
import static org.opensearch.search.aggregations.AggregationBuilders.terms;
Expand All @@ -82,7 +83,8 @@ public AggregationsIntegrationIT(Settings staticSettings) {
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE.getKey(), "auto").build() }
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@

import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE;
import static org.opensearch.search.aggregations.AggregationBuilders.histogram;
import static org.opensearch.search.aggregations.AggregationBuilders.missing;
import static org.opensearch.search.aggregations.AggregationBuilders.terms;
Expand All @@ -70,7 +71,8 @@ public CombiIT(Settings staticSettings) {
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE.getKey(), "auto").build() }
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@

import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE;
import static org.opensearch.search.aggregations.AggregationBuilders.extendedStats;
import static org.opensearch.search.aggregations.AggregationBuilders.filter;
import static org.opensearch.search.aggregations.AggregationBuilders.histogram;
Expand Down Expand Up @@ -103,7 +104,8 @@ public EquivalenceIT(Settings staticSettings) {
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE.getKey(), "auto").build() }
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@

import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE;
import static org.opensearch.search.aggregations.AggregationBuilders.sum;
import static org.opensearch.search.aggregations.AggregationBuilders.terms;
import static org.opensearch.search.aggregations.PipelineAggregatorBuilders.maxBucket;
Expand All @@ -66,7 +67,8 @@ public MetadataIT(Settings staticSettings) {
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE.getKey(), "auto").build() }
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.Collection;

import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE;
import static org.opensearch.search.aggregations.AggregationBuilders.cardinality;
import static org.opensearch.search.aggregations.AggregationBuilders.dateHistogram;
import static org.opensearch.search.aggregations.AggregationBuilders.geoCentroid;
Expand All @@ -74,7 +75,8 @@ public MissingValueIT(Settings staticSettings) {
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE.getKey(), "auto").build() }
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.Set;

import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures;
Expand All @@ -68,7 +69,8 @@ public DuelScrollIT(Settings settings) {
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE.getKey(), "auto").build() }
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
import static org.opensearch.index.query.QueryBuilders.queryStringQuery;
import static org.opensearch.index.query.QueryBuilders.termQuery;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
Expand All @@ -100,7 +101,8 @@ public SearchScrollIT(Settings settings) {
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE.getKey(), "auto").build() }
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAllSuccessful;
Expand All @@ -66,7 +67,8 @@ public SearchScrollWithFailingNodesIT(Settings settings) {
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE.getKey(), "auto").build() }
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -748,8 +748,9 @@ public void apply(Settings value, Settings current, Settings previous) {
IoBasedAdmissionControllerSettings.INDEXING_IO_USAGE_LIMIT,

// Concurrent segment search settings
SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING,
SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING, // deprecated
SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING,
SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE,

RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING,

// Settings for concurrent segment search
IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING,
IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING, // deprecated
IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_MODE,
IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_MAX_SLICE_COUNT,
IndexSettings.ALLOW_DERIVED_FIELDS,

Expand Down
26 changes: 25 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_NESTED_FIELDS_LIMIT_SETTING;
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING;
import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectory.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY_MINIMUM_VERSION;
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_MODE_ALL;
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_MODE_AUTO;
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_MODE_NONE;
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_DEFAULT_VALUE;

/**
Expand Down Expand Up @@ -689,7 +692,28 @@ public static IndexMergePolicy fromString(String text) {
"index.search.concurrent_segment_search.enabled",
false,
Property.IndexScope,
Property.Dynamic
Property.Dynamic,
Property.Deprecated
);

public static final Setting<String> INDEX_CONCURRENT_SEGMENT_SEARCH_MODE = Setting.simpleString(
"index.search.concurrent_segment_search.mode",
INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING.get(Settings.EMPTY)
? CONCURRENT_SEGMENT_SEARCH_MODE_ALL
: CONCURRENT_SEGMENT_SEARCH_MODE_NONE,
value -> {
switch (value) {
case CONCURRENT_SEGMENT_SEARCH_MODE_ALL:
case CONCURRENT_SEGMENT_SEARCH_MODE_NONE:
case CONCURRENT_SEGMENT_SEARCH_MODE_AUTO:
// valid setting
break;
default:
throw new IllegalArgumentException("Setting value must be one of [all, none, auto]");
}
},
Property.Dynamic,
Property.IndexScope
);

public static final Setting<Integer> INDEX_CONCURRENT_SEGMENT_SEARCH_MAX_SLICE_COUNT = Setting.intSetting(
Expand Down
10 changes: 7 additions & 3 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@
import org.opensearch.search.aggregations.support.AggregationUsageService;
import org.opensearch.search.backpressure.SearchBackpressureService;
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.search.deciders.ConcurrentSearchDecider;
import org.opensearch.search.fetch.FetchPhase;
import org.opensearch.search.pipeline.SearchPipelineService;
import org.opensearch.search.query.QueryPhase;
Expand Down Expand Up @@ -1314,7 +1315,8 @@ protected Node(
responseCollectorService,
circuitBreakerService,
searchModule.getIndexSearcherExecutor(threadPool),
taskResourceTrackingService
taskResourceTrackingService,
searchModule.getConcurrentSearchDecidersList()
);

final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService.filterPlugins(PersistentTaskPlugin.class)
Expand Down Expand Up @@ -1970,7 +1972,8 @@ protected SearchService newSearchService(
ResponseCollectorService responseCollectorService,
CircuitBreakerService circuitBreakerService,
Executor indexSearcherExecutor,
TaskResourceTrackingService taskResourceTrackingService
TaskResourceTrackingService taskResourceTrackingService,
Collection<ConcurrentSearchDecider> concurrentSearchDecidersList
) {
return new SearchService(
clusterService,
Expand All @@ -1983,7 +1986,8 @@ protected SearchService newSearchService(
responseCollectorService,
circuitBreakerService,
indexSearcherExecutor,
taskResourceTrackingService
taskResourceTrackingService,
concurrentSearchDecidersList
);
}

Expand Down
11 changes: 11 additions & 0 deletions server/src/main/java/org/opensearch/plugins/SearchPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.opensearch.search.aggregations.pipeline.MovAvgPipelineAggregator;
import org.opensearch.search.aggregations.pipeline.PipelineAggregator;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.search.deciders.ConcurrentSearchDecider;
import org.opensearch.search.fetch.FetchSubPhase;
import org.opensearch.search.fetch.subphase.highlight.Highlighter;
import org.opensearch.search.query.QueryPhaseSearcher;
Expand All @@ -77,6 +78,8 @@
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -138,6 +141,14 @@ default Map<String, Highlighter> getHighlighters() {
return emptyMap();
}

/**
* Allows plugins to create concurrent search deciders
* @return Collection or ConcurrentSearchDeciders
*/
default Collection<ConcurrentSearchDecider> createConcurrentSearchDeciders() {
return Collections.emptyList();
}

/**
* The new {@link Suggester}s defined by this plugin.
*/
Expand Down
Loading

0 comments on commit 708a158

Please sign in to comment.