Skip to content

Commit

Permalink
Introduce QueryPhaseSearcher extension point (SearchPlugin) (#1931)
Browse files Browse the repository at this point in the history
Signed-off-by: Andriy Redko <[email protected]>
  • Loading branch information
reta authored Mar 22, 2022
1 parent bd2d935 commit 82fb7ab
Show file tree
Hide file tree
Showing 11 changed files with 259 additions and 21 deletions.
14 changes: 11 additions & 3 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@
import org.opensearch.search.SearchService;
import org.opensearch.search.aggregations.support.AggregationUsageService;
import org.opensearch.search.fetch.FetchPhase;
import org.opensearch.search.query.QueryPhase;
import org.opensearch.snapshots.InternalSnapshotsInfoService;
import org.opensearch.snapshots.RestoreService;
import org.opensearch.snapshots.SnapshotShardsService;
Expand Down Expand Up @@ -210,6 +211,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.UnaryOperator;
Expand Down Expand Up @@ -849,9 +851,11 @@ protected Node(
threadPool,
scriptService,
bigArrays,
searchModule.getQueryPhase(),
searchModule.getFetchPhase(),
responseCollectorService,
circuitBreakerService
circuitBreakerService,
searchModule.getIndexSearcherExecutor(threadPool)
);

final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService.filterPlugins(PersistentTaskPlugin.class)
Expand Down Expand Up @@ -1407,19 +1411,23 @@ protected SearchService newSearchService(
ThreadPool threadPool,
ScriptService scriptService,
BigArrays bigArrays,
QueryPhase queryPhase,
FetchPhase fetchPhase,
ResponseCollectorService responseCollectorService,
CircuitBreakerService circuitBreakerService
CircuitBreakerService circuitBreakerService,
Executor indexSearcherExecutor
) {
return new SearchService(
clusterService,
indicesService,
threadPool,
scriptService,
bigArrays,
queryPhase,
fetchPhase,
responseCollectorService,
circuitBreakerService
circuitBreakerService,
indexSearcherExecutor
);
}

Expand Down
36 changes: 36 additions & 0 deletions server/src/main/java/org/opensearch/plugins/SearchPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.plugins;

import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Sort;
import org.opensearch.common.CheckedFunction;
Expand All @@ -40,6 +41,7 @@
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.lucene.search.function.ScoreFunction;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.ContextParser;
import org.opensearch.common.xcontent.XContent;
import org.opensearch.common.xcontent.XContentParser;
Expand All @@ -61,18 +63,22 @@
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.search.fetch.FetchSubPhase;
import org.opensearch.search.fetch.subphase.highlight.Highlighter;
import org.opensearch.search.query.QueryPhaseSearcher;
import org.opensearch.search.rescore.Rescorer;
import org.opensearch.search.rescore.RescorerBuilder;
import org.opensearch.search.sort.SortBuilder;
import org.opensearch.search.sort.SortParser;
import org.opensearch.search.suggest.Suggest;
import org.opensearch.search.suggest.Suggester;
import org.opensearch.search.suggest.SuggestionBuilder;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.function.BiFunction;
import java.util.function.Consumer;

Expand Down Expand Up @@ -178,6 +184,36 @@ default List<RescorerSpec<?>> getRescorers() {
return emptyList();
}

/**
* The new {@link QueryPhaseSearcher} added by this plugin. At the moment, only one {@link QueryPhaseSearcher} is supported per
* instance, the {@link IllegalStateException} is going to be thrown if more then one plugin tries to register
* {@link QueryPhaseSearcher} implementation.
*/
default Optional<QueryPhaseSearcher> getQueryPhaseSearcher() {
return Optional.empty();
}

/**
* The executor service provider (registered through {@link Plugin#getExecutorBuilders(Settings)} to be used at search
* time by {@link IndexSearcher}. The {@link IllegalStateException} is going to be thrown if more then one
* plugin tries to register index searcher executor.
*/
default Optional<ExecutorServiceProvider> getIndexSearcherExecutorProvider() {
return Optional.empty();
}

/**
* Executor service provider
*/
interface ExecutorServiceProvider {
/**
* Provides an executor service instance
* @param threadPool thread pool
* @return executor service instance
*/
ExecutorService getExecutor(ThreadPool threadPool);
}

/**
* Specification of custom {@link ScoreFunction}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.LongSupplier;

final class DefaultSearchContext extends SearchContext {
Expand Down Expand Up @@ -177,7 +178,8 @@ final class DefaultSearchContext extends SearchContext {
FetchPhase fetchPhase,
boolean lowLevelCancellation,
Version minNodeVersion,
boolean validate
boolean validate,
Executor executor
) throws IOException {
this.readerContext = readerContext;
this.request = request;
Expand All @@ -198,7 +200,8 @@ final class DefaultSearchContext extends SearchContext {
engineSearcher.getSimilarity(),
engineSearcher.getQueryCache(),
engineSearcher.getQueryCachingPolicy(),
lowLevelCancellation
lowLevelCancellation,
executor
);
this.relativeTimeSupplier = relativeTimeSupplier;
this.timeout = timeout;
Expand Down
52 changes: 52 additions & 0 deletions server/src/main/java/org/opensearch/search/SearchModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.lucene.search.BooleanQuery;
import org.opensearch.common.NamedRegistry;
import org.opensearch.common.Nullable;
import org.opensearch.common.ParseField;
import org.opensearch.common.geo.GeoShapeType;
import org.opensearch.common.geo.ShapesAvailability;
Expand Down Expand Up @@ -273,6 +274,8 @@
import org.opensearch.search.fetch.subphase.highlight.Highlighter;
import org.opensearch.search.fetch.subphase.highlight.PlainHighlighter;
import org.opensearch.search.fetch.subphase.highlight.UnifiedHighlighter;
import org.opensearch.search.query.QueryPhase;
import org.opensearch.search.query.QueryPhaseSearcher;
import org.opensearch.search.rescore.QueryRescorerBuilder;
import org.opensearch.search.rescore.RescorerBuilder;
import org.opensearch.search.sort.FieldSortBuilder;
Expand All @@ -293,11 +296,14 @@
import org.opensearch.search.suggest.phrase.StupidBackoff;
import org.opensearch.search.suggest.term.TermSuggestion;
import org.opensearch.search.suggest.term.TermSuggestionBuilder;
import org.opensearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;

Expand Down Expand Up @@ -329,6 +335,8 @@ public class SearchModule {
private final List<NamedWriteableRegistry.Entry> namedWriteables = new ArrayList<>();
private final List<NamedXContentRegistry.Entry> namedXContents = new ArrayList<>();
private final ValuesSourceRegistry valuesSourceRegistry;
private final QueryPhaseSearcher queryPhaseSearcher;
private final SearchPlugin.ExecutorServiceProvider indexSearcherExecutorProvider;

/**
* Constructs a new SearchModule object
Expand All @@ -355,6 +363,8 @@ public SearchModule(Settings settings, List<SearchPlugin> plugins) {
registerSearchExts(plugins);
registerShapes();
registerIntervalsSourceProviders();
queryPhaseSearcher = registerQueryPhaseSearcher(plugins);
indexSearcherExecutorProvider = registerIndexSearcherExecutorProvider(plugins);
namedWriteables.addAll(SortValue.namedWriteables());
}

Expand Down Expand Up @@ -1282,7 +1292,49 @@ private void registerSort(SortSpec<?> spec) {
);
}

private QueryPhaseSearcher registerQueryPhaseSearcher(List<SearchPlugin> plugins) {
QueryPhaseSearcher searcher = null;

for (SearchPlugin plugin : plugins) {
final Optional<QueryPhaseSearcher> searcherOpt = plugin.getQueryPhaseSearcher();

if (searcher == null) {
searcher = searcherOpt.orElse(null);
} else if (searcherOpt.isPresent()) {
throw new IllegalStateException("Only one QueryPhaseSearcher is allowed, but more than one are provided by the plugins");
}
}

return searcher;
}

private SearchPlugin.ExecutorServiceProvider registerIndexSearcherExecutorProvider(List<SearchPlugin> plugins) {
SearchPlugin.ExecutorServiceProvider provider = null;

for (SearchPlugin plugin : plugins) {
final Optional<SearchPlugin.ExecutorServiceProvider> providerOpt = plugin.getIndexSearcherExecutorProvider();

if (provider == null) {
provider = providerOpt.orElse(null);
} else if (providerOpt.isPresent()) {
throw new IllegalStateException(
"The index searcher executor is already assigned but more than one are provided by the plugins"
);
}
}

return provider;
}

public FetchPhase getFetchPhase() {
return new FetchPhase(fetchSubPhases);
}

public QueryPhase getQueryPhase() {
return (queryPhaseSearcher == null) ? new QueryPhase() : new QueryPhase(queryPhaseSearcher);
}

public @Nullable ExecutorService getIndexSearcherExecutor(ThreadPool pool) {
return (indexSearcherExecutorProvider == null) ? null : indexSearcherExecutorProvider.getExecutor(pool);
}
}
11 changes: 8 additions & 3 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -256,16 +256,19 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv

private final AtomicInteger openScrollContexts = new AtomicInteger();
private final String sessionId = UUIDs.randomBase64UUID();
private final Executor indexSearcherExecutor;

public SearchService(
ClusterService clusterService,
IndicesService indicesService,
ThreadPool threadPool,
ScriptService scriptService,
BigArrays bigArrays,
QueryPhase queryPhase,
FetchPhase fetchPhase,
ResponseCollectorService responseCollectorService,
CircuitBreakerService circuitBreakerService
CircuitBreakerService circuitBreakerService,
Executor indexSearcherExecutor
) {
Settings settings = clusterService.getSettings();
this.threadPool = threadPool;
Expand All @@ -274,13 +277,14 @@ public SearchService(
this.scriptService = scriptService;
this.responseCollectorService = responseCollectorService;
this.bigArrays = bigArrays;
this.queryPhase = new QueryPhase();
this.queryPhase = queryPhase;
this.fetchPhase = fetchPhase;
this.multiBucketConsumerService = new MultiBucketConsumerService(
clusterService,
settings,
circuitBreakerService.getBreaker(CircuitBreaker.REQUEST)
);
this.indexSearcherExecutor = indexSearcherExecutor;

TimeValue keepAliveInterval = KEEPALIVE_INTERVAL_SETTING.get(settings);
setKeepAlives(DEFAULT_KEEPALIVE_SETTING.get(settings), MAX_KEEPALIVE_SETTING.get(settings));
Expand Down Expand Up @@ -884,7 +888,8 @@ private DefaultSearchContext createSearchContext(ReaderContext reader, ShardSear
fetchPhase,
lowLevelCancellation,
clusterService.state().nodes().getMinNodeVersion(),
validate
validate,
indexSearcherExecutor
);
// we clone the query shard context here just for rewriting otherwise we
// might end up with incorrect state since we are using now() or script services
Expand Down
Loading

0 comments on commit 82fb7ab

Please sign in to comment.