Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce QueryPhaseSearcher extension point (SearchPlugin) #1931

Merged
merged 1 commit into from
Mar 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@
import org.opensearch.search.SearchService;
import org.opensearch.search.aggregations.support.AggregationUsageService;
import org.opensearch.search.fetch.FetchPhase;
import org.opensearch.search.query.QueryPhase;
import org.opensearch.snapshots.InternalSnapshotsInfoService;
import org.opensearch.snapshots.RestoreService;
import org.opensearch.snapshots.SnapshotShardsService;
Expand Down Expand Up @@ -210,6 +211,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.UnaryOperator;
Expand Down Expand Up @@ -849,9 +851,11 @@ protected Node(
threadPool,
scriptService,
bigArrays,
searchModule.getQueryPhase(),
searchModule.getFetchPhase(),
responseCollectorService,
circuitBreakerService
circuitBreakerService,
searchModule.getIndexSearcherExecutor(threadPool)
);

final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService.filterPlugins(PersistentTaskPlugin.class)
Expand Down Expand Up @@ -1407,19 +1411,23 @@ protected SearchService newSearchService(
ThreadPool threadPool,
ScriptService scriptService,
BigArrays bigArrays,
QueryPhase queryPhase,
FetchPhase fetchPhase,
ResponseCollectorService responseCollectorService,
CircuitBreakerService circuitBreakerService
CircuitBreakerService circuitBreakerService,
Executor indexSearcherExecutor
) {
return new SearchService(
clusterService,
indicesService,
threadPool,
scriptService,
bigArrays,
queryPhase,
fetchPhase,
responseCollectorService,
circuitBreakerService
circuitBreakerService,
indexSearcherExecutor
);
}

Expand Down
36 changes: 36 additions & 0 deletions server/src/main/java/org/opensearch/plugins/SearchPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.plugins;

import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Sort;
import org.opensearch.common.CheckedFunction;
Expand All @@ -40,6 +41,7 @@
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.lucene.search.function.ScoreFunction;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.ContextParser;
import org.opensearch.common.xcontent.XContent;
import org.opensearch.common.xcontent.XContentParser;
Expand All @@ -61,18 +63,22 @@
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.search.fetch.FetchSubPhase;
import org.opensearch.search.fetch.subphase.highlight.Highlighter;
import org.opensearch.search.query.QueryPhaseSearcher;
import org.opensearch.search.rescore.Rescorer;
import org.opensearch.search.rescore.RescorerBuilder;
import org.opensearch.search.sort.SortBuilder;
import org.opensearch.search.sort.SortParser;
import org.opensearch.search.suggest.Suggest;
import org.opensearch.search.suggest.Suggester;
import org.opensearch.search.suggest.SuggestionBuilder;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.function.BiFunction;
import java.util.function.Consumer;

Expand Down Expand Up @@ -178,6 +184,36 @@ default List<RescorerSpec<?>> getRescorers() {
return emptyList();
}

/**
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nknize @andrross guys would really appreciate the feedback if I am on the right track here, moving towards sandboxing concurrent search implementation (#1500) .

Basically, the idea is to provide the extension points to allow plugging in different searcher implementation(s) at QueryPhase time (the default would be the current sequential scan). In this case the core would know nothing about concurrent / non-concurrent nature of the search over Apache Lucene segments. But the sandbox plugin, if installed, could tweak that (even on per index basis) and substitute the searcher with concurrent one.

Thank you.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Long-term, is it possible for concurrent search to become the only implementation, even if there are some cases where it is configured to run with a single thread?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, yeah, there are two flows to address (aggregations and forced early termination) but I don't see the reasons why it couldn't become the only implementation in future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My initial thought is that it would be good to get towards a single implementation for something like this and so it might not be worth it to make it pluggable at this point. Would love to get Nick's @nknize opinion on this as well.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the initial implementation (#1500) is doing exactly that, but @nknize had the concerns and suggested to sandbox the changes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol, then let's definitely get Nick's input on this!

* The new {@link QueryPhaseSearcher} added by this plugin. At the moment, only one {@link QueryPhaseSearcher} is supported per
* instance, the {@link IllegalStateException} is going to be thrown if more then one plugin tries to register
* {@link QueryPhaseSearcher} implementation.
*/
default Optional<QueryPhaseSearcher> getQueryPhaseSearcher() {
return Optional.empty();
}

/**
* The executor service provider (registered through {@link Plugin#getExecutorBuilders(Settings)} to be used at search
* time by {@link IndexSearcher}. The {@link IllegalStateException} is going to be thrown if more then one
* plugin tries to register index searcher executor.
*/
default Optional<ExecutorServiceProvider> getIndexSearcherExecutorProvider() {
return Optional.empty();
}

/**
* Executor service provider
*/
interface ExecutorServiceProvider {
/**
* Provides an executor service instance
* @param threadPool thread pool
* @return executor service instance
*/
ExecutorService getExecutor(ThreadPool threadPool);
}

/**
* Specification of custom {@link ScoreFunction}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.LongSupplier;

final class DefaultSearchContext extends SearchContext {
Expand Down Expand Up @@ -177,7 +178,8 @@ final class DefaultSearchContext extends SearchContext {
FetchPhase fetchPhase,
boolean lowLevelCancellation,
Version minNodeVersion,
boolean validate
boolean validate,
Executor executor
) throws IOException {
this.readerContext = readerContext;
this.request = request;
Expand All @@ -198,7 +200,8 @@ final class DefaultSearchContext extends SearchContext {
engineSearcher.getSimilarity(),
engineSearcher.getQueryCache(),
engineSearcher.getQueryCachingPolicy(),
lowLevelCancellation
lowLevelCancellation,
executor
);
this.relativeTimeSupplier = relativeTimeSupplier;
this.timeout = timeout;
Expand Down
52 changes: 52 additions & 0 deletions server/src/main/java/org/opensearch/search/SearchModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.lucene.search.BooleanQuery;
import org.opensearch.common.NamedRegistry;
import org.opensearch.common.Nullable;
import org.opensearch.common.ParseField;
import org.opensearch.common.geo.GeoShapeType;
import org.opensearch.common.geo.ShapesAvailability;
Expand Down Expand Up @@ -273,6 +274,8 @@
import org.opensearch.search.fetch.subphase.highlight.Highlighter;
import org.opensearch.search.fetch.subphase.highlight.PlainHighlighter;
import org.opensearch.search.fetch.subphase.highlight.UnifiedHighlighter;
import org.opensearch.search.query.QueryPhase;
import org.opensearch.search.query.QueryPhaseSearcher;
import org.opensearch.search.rescore.QueryRescorerBuilder;
import org.opensearch.search.rescore.RescorerBuilder;
import org.opensearch.search.sort.FieldSortBuilder;
Expand All @@ -293,11 +296,14 @@
import org.opensearch.search.suggest.phrase.StupidBackoff;
import org.opensearch.search.suggest.term.TermSuggestion;
import org.opensearch.search.suggest.term.TermSuggestionBuilder;
import org.opensearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;

Expand Down Expand Up @@ -329,6 +335,8 @@ public class SearchModule {
private final List<NamedWriteableRegistry.Entry> namedWriteables = new ArrayList<>();
private final List<NamedXContentRegistry.Entry> namedXContents = new ArrayList<>();
private final ValuesSourceRegistry valuesSourceRegistry;
private final QueryPhaseSearcher queryPhaseSearcher;
private final SearchPlugin.ExecutorServiceProvider indexSearcherExecutorProvider;

/**
* Constructs a new SearchModule object
Expand All @@ -355,6 +363,8 @@ public SearchModule(Settings settings, List<SearchPlugin> plugins) {
registerSearchExts(plugins);
registerShapes();
registerIntervalsSourceProviders();
queryPhaseSearcher = registerQueryPhaseSearcher(plugins);
indexSearcherExecutorProvider = registerIndexSearcherExecutorProvider(plugins);
namedWriteables.addAll(SortValue.namedWriteables());
}

Expand Down Expand Up @@ -1282,7 +1292,49 @@ private void registerSort(SortSpec<?> spec) {
);
}

private QueryPhaseSearcher registerQueryPhaseSearcher(List<SearchPlugin> plugins) {
QueryPhaseSearcher searcher = null;

for (SearchPlugin plugin : plugins) {
final Optional<QueryPhaseSearcher> searcherOpt = plugin.getQueryPhaseSearcher();

if (searcher == null) {
searcher = searcherOpt.orElse(null);
} else if (searcherOpt.isPresent()) {
throw new IllegalStateException("Only one QueryPhaseSearcher is allowed, but more than one are provided by the plugins");
}
}

return searcher;
}

private SearchPlugin.ExecutorServiceProvider registerIndexSearcherExecutorProvider(List<SearchPlugin> plugins) {
SearchPlugin.ExecutorServiceProvider provider = null;

for (SearchPlugin plugin : plugins) {
final Optional<SearchPlugin.ExecutorServiceProvider> providerOpt = plugin.getIndexSearcherExecutorProvider();

if (provider == null) {
provider = providerOpt.orElse(null);
} else if (providerOpt.isPresent()) {
throw new IllegalStateException(
"The index searcher executor is already assigned but more than one are provided by the plugins"
);
}
}

return provider;
}

public FetchPhase getFetchPhase() {
return new FetchPhase(fetchSubPhases);
}

public QueryPhase getQueryPhase() {
return (queryPhaseSearcher == null) ? new QueryPhase() : new QueryPhase(queryPhaseSearcher);
}

public @Nullable ExecutorService getIndexSearcherExecutor(ThreadPool pool) {
return (indexSearcherExecutorProvider == null) ? null : indexSearcherExecutorProvider.getExecutor(pool);
}
}
11 changes: 8 additions & 3 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -256,16 +256,19 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv

private final AtomicInteger openScrollContexts = new AtomicInteger();
private final String sessionId = UUIDs.randomBase64UUID();
private final Executor indexSearcherExecutor;

public SearchService(
ClusterService clusterService,
IndicesService indicesService,
ThreadPool threadPool,
ScriptService scriptService,
BigArrays bigArrays,
QueryPhase queryPhase,
FetchPhase fetchPhase,
ResponseCollectorService responseCollectorService,
CircuitBreakerService circuitBreakerService
CircuitBreakerService circuitBreakerService,
Executor indexSearcherExecutor
) {
Settings settings = clusterService.getSettings();
this.threadPool = threadPool;
Expand All @@ -274,13 +277,14 @@ public SearchService(
this.scriptService = scriptService;
this.responseCollectorService = responseCollectorService;
this.bigArrays = bigArrays;
this.queryPhase = new QueryPhase();
this.queryPhase = queryPhase;
this.fetchPhase = fetchPhase;
this.multiBucketConsumerService = new MultiBucketConsumerService(
clusterService,
settings,
circuitBreakerService.getBreaker(CircuitBreaker.REQUEST)
);
this.indexSearcherExecutor = indexSearcherExecutor;

TimeValue keepAliveInterval = KEEPALIVE_INTERVAL_SETTING.get(settings);
setKeepAlives(DEFAULT_KEEPALIVE_SETTING.get(settings), MAX_KEEPALIVE_SETTING.get(settings));
Expand Down Expand Up @@ -884,7 +888,8 @@ private DefaultSearchContext createSearchContext(ReaderContext reader, ShardSear
fetchPhase,
lowLevelCancellation,
clusterService.state().nodes().getMinNodeVersion(),
validate
validate,
indexSearcherExecutor
);
// we clone the query shard context here just for rewriting otherwise we
// might end up with incorrect state since we are using now() or script services
Expand Down
Loading