From 1c273318ee6c58078cc15eb630f544b18198290d Mon Sep 17 00:00:00 2001 From: Vijayan Balasubramanian Date: Mon, 3 Jun 2024 10:32:44 -0700 Subject: [PATCH 1/3] Implement parallel execution of sub-queries for hybrid search (#749) Implement parallel execution of sub-queries for hybrid search Add new thread pool to schedule tasks that are related to hybrid query execution Register executor builders with Plugin Use Lucene's Task Executor to execute and collect results Parallelize Query re-write Parallelize score supplier creation Parallelize build hybrid scores Signed-off-by: Vijayan Balasubramanian --- CHANGELOG.md | 1 + .../executors/HybridQueryExecutor.java | 87 +++++++++++++++++++ .../HybridQueryExecutorCollector.java | 38 ++++++++ .../HybridQueryExecutorCollectorManager.java | 19 ++++ .../HybridQueryRewriteCollectorManager.java | 76 ++++++++++++++++ ...ridQueryScoreSupplierCollectorManager.java | 58 +++++++++++++ .../HybridQueryScoresCollectionManager.java | 54 ++++++++++++ .../neuralsearch/plugin/NeuralSearch.java | 9 ++ .../neuralsearch/query/HybridQuery.java | 46 +++++++--- .../neuralsearch/query/HybridQueryScorer.java | 29 ++++++- .../neuralsearch/query/HybridQueryWeight.java | 34 ++++++-- .../executors/HybridQueryExecutorIT.java | 30 +++++++ .../plugin/NeuralSearchTests.java | 16 ++++ 13 files changed, 479 insertions(+), 18 deletions(-) create mode 100644 src/main/java/org/opensearch/neuralsearch/executors/HybridQueryExecutor.java create mode 100644 src/main/java/org/opensearch/neuralsearch/executors/HybridQueryExecutorCollector.java create mode 100644 src/main/java/org/opensearch/neuralsearch/executors/HybridQueryExecutorCollectorManager.java create mode 100644 src/main/java/org/opensearch/neuralsearch/executors/HybridQueryRewriteCollectorManager.java create mode 100644 src/main/java/org/opensearch/neuralsearch/executors/HybridQueryScoreSupplierCollectorManager.java create mode 100644 src/main/java/org/opensearch/neuralsearch/executors/HybridQueryScoresCollectionManager.java create mode 100644 src/test/java/org/opensearch/neuralsearch/executors/HybridQueryExecutorIT.java diff --git a/CHANGELOG.md b/CHANGELOG.md index eabbf72f8..04e5211ec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Optimize parameter parsing in text chunking processor ([#733](https://github.com/opensearch-project/neural-search/pull/733)) - Use lazy initialization for priority queue of hits and scores to improve latencies by 20% ([#746](https://github.com/opensearch-project/neural-search/pull/746)) - Optimize max score calculation in the Query Phase of the Hybrid Search ([765](https://github.com/opensearch-project/neural-search/pull/765)) +- Implement parallel execution of sub-queries for hybrid search ([#749](https://github.com/opensearch-project/neural-search/pull/749)) ### Bug Fixes - Total hit count fix in Hybrid Query ([756](https://github.com/opensearch-project/neural-search/pull/756)) - Fix map type validation issue in multiple pipeline processors ([#661](https://github.com/opensearch-project/neural-search/pull/661)) diff --git a/src/main/java/org/opensearch/neuralsearch/executors/HybridQueryExecutor.java b/src/main/java/org/opensearch/neuralsearch/executors/HybridQueryExecutor.java new file mode 100644 index 000000000..c099daf79 --- /dev/null +++ b/src/main/java/org/opensearch/neuralsearch/executors/HybridQueryExecutor.java @@ -0,0 +1,87 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.neuralsearch.executors; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.experimental.PackagePrivate; +import org.apache.lucene.search.TaskExecutor; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; +import org.opensearch.threadpool.ExecutorBuilder; +import org.opensearch.threadpool.FixedExecutorBuilder; +import org.opensearch.threadpool.ThreadPool; + +/** + * {@link HybridQueryExecutor} provides necessary implementation and instances to execute + * sub-queries from hybrid query in parallel as a Task by caller. This ensures that one thread pool + * is used for hybrid query execution per node. The number of parallelization is also constrained + * by twice allocated processor count since most of the operation from hybrid search is expected to be + * short-lived thread. This will help us to achieve optimal parallelization and reasonable throughput. + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public final class HybridQueryExecutor { + private static final String HYBRID_QUERY_EXEC_THREAD_POOL_NAME = "_plugin_neural_search_hybrid_query_executor"; + private static final Integer HYBRID_QUERY_EXEC_THREAD_POOL_QUEUE_SIZE = 1000; + private static final Integer MAX_THREAD_SIZE = 1000; + private static final Integer MIN_THREAD_SIZE = 2; + private static final Integer PROCESSOR_COUNT_MULTIPLIER = 2; + private static TaskExecutor taskExecutor; + + /** + * Provide fixed executor builder to use for hybrid query executors + * @param settings Node level settings + * @return the executor builder for hybrid query's custom thread pool. + */ + public static ExecutorBuilder getExecutorBuilder(final Settings settings) { + + int numberOfThreads = getFixedNumberOfThreadSize(settings); + return new FixedExecutorBuilder( + settings, + HYBRID_QUERY_EXEC_THREAD_POOL_NAME, + numberOfThreads, + HYBRID_QUERY_EXEC_THREAD_POOL_QUEUE_SIZE, + HYBRID_QUERY_EXEC_THREAD_POOL_NAME + ); + } + + /** + * Initialize @{@link TaskExecutor} to run tasks concurrently using {@link ThreadPool} + * @param threadPool OpenSearch's thread pool instance + */ + public static void initialize(ThreadPool threadPool) { + if (threadPool == null) { + throw new IllegalArgumentException( + "Argument thread-pool to Hybrid Query Executor cannot be null." + + "This is required to build executor to run actions in parallel" + ); + } + taskExecutor = new TaskExecutor(threadPool.executor(HYBRID_QUERY_EXEC_THREAD_POOL_NAME)); + } + + /** + * Return TaskExecutor Wrapper that helps runs tasks concurrently + * @return TaskExecutor instance to help run search tasks in parallel + */ + public static TaskExecutor getExecutor() { + return taskExecutor != null ? taskExecutor : new TaskExecutor(Runnable::run); + } + + @PackagePrivate + public static String getThreadPoolName() { + return HYBRID_QUERY_EXEC_THREAD_POOL_NAME; + } + + /** + * Will use thread size as twice the default allocated processor. We selected twice allocated processor + * since hybrid query action is expected to be short-lived . This will balance throughput and latency + * To avoid out of range, we will return 2 as minimum processor count and 1000 as maximum thread size + */ + private static int getFixedNumberOfThreadSize(final Settings settings) { + final int allocatedProcessors = OpenSearchExecutors.allocatedProcessors(settings); + int threadSize = Math.max(PROCESSOR_COUNT_MULTIPLIER * allocatedProcessors, MIN_THREAD_SIZE); + return Math.min(threadSize, MAX_THREAD_SIZE); + } +} diff --git a/src/main/java/org/opensearch/neuralsearch/executors/HybridQueryExecutorCollector.java b/src/main/java/org/opensearch/neuralsearch/executors/HybridQueryExecutorCollector.java new file mode 100644 index 000000000..78bb05410 --- /dev/null +++ b/src/main/java/org/opensearch/neuralsearch/executors/HybridQueryExecutorCollector.java @@ -0,0 +1,38 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.neuralsearch.executors; + +import lombok.AccessLevel; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.Synchronized; + +import java.util.Optional; +import java.util.function.Function; + +/** + * {@link HybridQueryExecutorCollector} is a generic Collector used by Hybrid Search Query during + * Query phase to parallelize sub query's action to improve latency + */ +@RequiredArgsConstructor(staticName = "newCollector", access = AccessLevel.PACKAGE) +public final class HybridQueryExecutorCollector { + + // will be used as input for all instances of collector generated by newCollector method, + // if it is required for collect operation + private final I param; + + // getResult should only be called after collector's collect method is invoked. + @Getter(onMethod_ = { @Synchronized }) + private Optional result = Optional.empty(); + + /** + * Called once for every time an action has to be performed on this Collector + * @param action function that will be executed and result will be stored at result. + */ + @Synchronized + public void collect(Function action) { + result = Optional.ofNullable(action.apply(param)); + } +} diff --git a/src/main/java/org/opensearch/neuralsearch/executors/HybridQueryExecutorCollectorManager.java b/src/main/java/org/opensearch/neuralsearch/executors/HybridQueryExecutorCollectorManager.java new file mode 100644 index 000000000..45bbde025 --- /dev/null +++ b/src/main/java/org/opensearch/neuralsearch/executors/HybridQueryExecutorCollectorManager.java @@ -0,0 +1,19 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.neuralsearch.executors; + +/** + * {@link HybridQueryExecutorCollectorManager} is responsible for creating new {@link HybridQueryExecutorCollector} instances + */ +public interface HybridQueryExecutorCollectorManager { + /** + * Return a new Collector instance that extends {@link HybridQueryExecutor}. + * This will be used during Hybrid Search when sub queries wants to execute part of + * operation that is independent of each other that can be parallelized to improve + * the performance. + * @return HybridQueryExecutorCollector + */ + C newCollector(); +} diff --git a/src/main/java/org/opensearch/neuralsearch/executors/HybridQueryRewriteCollectorManager.java b/src/main/java/org/opensearch/neuralsearch/executors/HybridQueryRewriteCollectorManager.java new file mode 100644 index 000000000..587ca8915 --- /dev/null +++ b/src/main/java/org/opensearch/neuralsearch/executors/HybridQueryRewriteCollectorManager.java @@ -0,0 +1,76 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.neuralsearch.executors; + +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * {@link HybridQueryRewriteCollectorManager} is responsible for creating {@link HybridQueryExecutorCollector} + * instances. Useful to create {@link HybridQueryExecutorCollector} instances that rewrites {@link Query} into primitive + * {@link Query} using {@link IndexSearcher} + */ +@RequiredArgsConstructor +public final class HybridQueryRewriteCollectorManager implements HybridQueryExecutorCollectorManager { + + private @NonNull IndexSearcher searcher; + + /** + * Returns new {@link HybridQueryExecutorCollector} to facilitate parallel execution + * @return HybridQueryExecutorCollector instance + */ + @Override + public HybridQueryExecutorCollector> newCollector() { + return HybridQueryExecutorCollector.newCollector(searcher); + } + + /** + * Returns list of {@link Query} that were rewritten by collectors. If collector doesn't + * have any result, null will be inserted to the result. + * This method must be called after collection is finished on all provided collectors. + * @param collectors list of collectors + * @return list of {@link Query} that was rewritten by corresponding collector from input. + */ + public List getQueriesAfterRewrite(List>> collectors) { + List rewrittenQueries = new ArrayList<>(); + for (HybridQueryExecutorCollector> collector : collectors) { + if (collector.getResult().isPresent()) { + rewrittenQueries.add(collector.getResult().get().getKey()); + } else { + // if for some reason collector didn't have result, we will add null to its + // position in the result. + rewrittenQueries.add(null); + } + } + return rewrittenQueries; + } + + /** + * Returns true if any of the {@link Query} from collector were actually rewritten. + * If any of the given collector doesn't have result, it will be ignored as if that + * instance did not exist. This method must be called after collection is finished + * on all provided collectors. + * @param collectors List of collectors to check any of their query was rewritten during + * collect step. + * @return at least one query is rewritten by any of the collectors + */ + public boolean anyQueryRewrite(List>> collectors) { + // return true if at least one query is rewritten + for (HybridQueryExecutorCollector> collector : collectors) { + final Optional> result = collector.getResult(); + if (result.isPresent() && result.get().getValue()) { + return true; + } + } + return false; + } +} diff --git a/src/main/java/org/opensearch/neuralsearch/executors/HybridQueryScoreSupplierCollectorManager.java b/src/main/java/org/opensearch/neuralsearch/executors/HybridQueryScoreSupplierCollectorManager.java new file mode 100644 index 000000000..078a069c9 --- /dev/null +++ b/src/main/java/org/opensearch/neuralsearch/executors/HybridQueryScoreSupplierCollectorManager.java @@ -0,0 +1,58 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.neuralsearch.executors; + +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.ScorerSupplier; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +/** + * HybridQueryScoreSupplierCollectorManager is responsible for creating {@link HybridQueryExecutorCollector} instances. + * Useful to create {@link HybridQueryExecutorCollector} instances that build {@link ScorerSupplier} from + * given weight. + */ +@RequiredArgsConstructor +public final class HybridQueryScoreSupplierCollectorManager + implements + HybridQueryExecutorCollectorManager> { + + private @NonNull LeafReaderContext context; + + /** + * Creates new {@link HybridQueryExecutorCollector} instance everytime to facilitate parallel execution + * by individual tasks + * @return new instance of HybridQueryExecutorCollector + */ + @Override + public HybridQueryExecutorCollector newCollector() { + return HybridQueryExecutorCollector.newCollector(context); + } + + /** + * mergeScoreSuppliers will build list of scoreSupplier from given list of collectors. + * This method should be called after HybridQueryExecutorCollector's collect method is called. + * If collectors didn't have any result, null will be added to list. + * This method must be called after collection is finished on all provided collectors. + * @param collectors List of collectors which is used to perform collection in parallel + * @return list of {@link ScorerSupplier} + */ + public List mergeScoreSuppliers(List> collectors) { + List scorerSuppliers = new ArrayList<>(); + for (HybridQueryExecutorCollector collector : collectors) { + Optional result = collector.getResult(); + if (result.isPresent()) { + scorerSuppliers.add(result.get()); + } else { + scorerSuppliers.add(null); + } + } + return scorerSuppliers; + } +} diff --git a/src/main/java/org/opensearch/neuralsearch/executors/HybridQueryScoresCollectionManager.java b/src/main/java/org/opensearch/neuralsearch/executors/HybridQueryScoresCollectionManager.java new file mode 100644 index 000000000..2b8409370 --- /dev/null +++ b/src/main/java/org/opensearch/neuralsearch/executors/HybridQueryScoresCollectionManager.java @@ -0,0 +1,54 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.neuralsearch.executors; + +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; +import java.util.Optional; + +/** + * {@link HybridQueryScoresCollectionManager} is responsible for creating {@link HybridQueryExecutorCollector} instances. + * Useful to create {@link HybridQueryExecutorCollector} instances that calls score method on individual + * scorer + */ +@NoArgsConstructor +public final class HybridQueryScoresCollectionManager + implements + HybridQueryExecutorCollectorManager> { + + /** + * Returns new {@link HybridQueryExecutorCollector} instance to facilitate parallel execution + * by individual tasks + * @return HybridQueryExecutorCollector instance + */ + @Override + public HybridQueryExecutorCollector newCollector() { + return HybridQueryExecutorCollector.newCollector(null); + } + + /** + * Update scores from collectors that was previously collected from scorer. + * Collector will provide score and index of scorer to map it back to score array. + * This method must be called after collection is finished on all provided collectors. + * @param collectors List of scorers where we want to calculate score. + * @param scores Float array to combine scores from available scores + */ + public void updateScores(final List> collectors, final float[] scores) { + for (HybridQueryExecutorCollector collector : collectors) { + final Optional result = collector.getResult(); + if (result.isPresent()) { + scores[result.get().getIndex()] = result.get().getScore(); + } + } + } + + @Data(staticConstructor = "of") + public static class ScoreWrapperFromCollector { + private final int index; + private final float score; + } +} diff --git a/src/main/java/org/opensearch/neuralsearch/plugin/NeuralSearch.java b/src/main/java/org/opensearch/neuralsearch/plugin/NeuralSearch.java index f74352012..8b173ba81 100644 --- a/src/main/java/org/opensearch/neuralsearch/plugin/NeuralSearch.java +++ b/src/main/java/org/opensearch/neuralsearch/plugin/NeuralSearch.java @@ -18,6 +18,7 @@ import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.xcontent.NamedXContentRegistry; @@ -25,6 +26,7 @@ import org.opensearch.env.NodeEnvironment; import org.opensearch.ingest.Processor; import org.opensearch.ml.client.MachineLearningNodeClient; +import org.opensearch.neuralsearch.executors.HybridQueryExecutor; import org.opensearch.neuralsearch.ml.MLCommonsClientAccessor; import org.opensearch.neuralsearch.processor.NeuralQueryEnricherProcessor; import org.opensearch.neuralsearch.processor.NeuralSparseTwoPhaseProcessor; @@ -63,6 +65,7 @@ import org.opensearch.search.pipeline.SearchRequestProcessor; import org.opensearch.search.pipeline.SearchResponseProcessor; import org.opensearch.search.query.QueryPhaseSearcher; +import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.ThreadPool; import org.opensearch.watcher.ResourceWatcherService; @@ -95,6 +98,7 @@ public Collection createComponents( NeuralSearchClusterUtil.instance().initialize(clusterService); NeuralQueryBuilder.initialize(clientAccessor); NeuralSparseQueryBuilder.initialize(clientAccessor); + HybridQueryExecutor.initialize(threadPool); normalizationProcessorWorkflow = new NormalizationProcessorWorkflow(new ScoreNormalizer(), new ScoreCombiner()); return List.of(clientAccessor); } @@ -108,6 +112,11 @@ public List> getQueries() { ); } + @Override + public List> getExecutorBuilders(Settings settings) { + return List.of(HybridQueryExecutor.getExecutorBuilder(settings)); + } + @Override public Map getProcessors(Processor.Parameters parameters) { clientAccessor = new MLCommonsClientAccessor(new MachineLearningNodeClient(parameters.client)); diff --git a/src/main/java/org/opensearch/neuralsearch/query/HybridQuery.java b/src/main/java/org/opensearch/neuralsearch/query/HybridQuery.java index db09f6ebc..60d5870da 100644 --- a/src/main/java/org/opensearch/neuralsearch/query/HybridQuery.java +++ b/src/main/java/org/opensearch/neuralsearch/query/HybridQuery.java @@ -5,12 +5,15 @@ package org.opensearch.neuralsearch.query; import java.io.IOException; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.concurrent.Callable; import org.apache.lucene.search.BooleanClause; import org.apache.lucene.search.BooleanQuery; @@ -20,6 +23,9 @@ import org.apache.lucene.search.QueryVisitor; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Weight; +import org.opensearch.neuralsearch.executors.HybridQueryExecutor; +import org.opensearch.neuralsearch.executors.HybridQueryExecutorCollector; +import org.opensearch.neuralsearch.executors.HybridQueryRewriteCollectorManager; /** * Implementation of Query interface for type "hybrid". It allows execution of multiple sub-queries and collect individual @@ -106,22 +112,40 @@ public Query rewrite(IndexSearcher indexSearcher) throws IOException { return new MatchNoDocsQuery("empty HybridQuery"); } - boolean actuallyRewritten = false; - List rewrittenSubQueries = new ArrayList<>(); + final HybridQueryRewriteCollectorManager manager = new HybridQueryRewriteCollectorManager(indexSearcher); + final List> queryRewriteTasks = new ArrayList<>(); + final List>> collectors = new ArrayList<>(); for (Query subQuery : subQueries) { - Query rewrittenSub = subQuery.rewrite(indexSearcher); - /* we keep rewrite sub-query unless it's not equal to itself, it may take multiple levels of recursive calls - queries need to be rewritten from high-level clauses into lower-level clauses because low-level clauses - perform better. For hybrid query we need to track progress of re-write for all sub-queries */ - actuallyRewritten |= rewrittenSub != subQuery; - rewrittenSubQueries.add(rewrittenSub); + final HybridQueryExecutorCollector> collector = manager.newCollector(); + collectors.add(collector); + queryRewriteTasks.add(() -> rewriteQuery(subQuery, collector)); } - if (actuallyRewritten) { - return new HybridQuery(rewrittenSubQueries); + HybridQueryExecutor.getExecutor().invokeAll(queryRewriteTasks); + + final boolean isAnyQueryRewritten = manager.anyQueryRewrite(collectors); + if (isAnyQueryRewritten == false) { + return super.rewrite(indexSearcher); } + final List rewrittenSubQueries = manager.getQueriesAfterRewrite(collectors); + return new HybridQuery(rewrittenSubQueries); + } - return super.rewrite(indexSearcher); + private Void rewriteQuery(Query query, HybridQueryExecutorCollector> collector) { + collector.collect(indexSearcher -> { + try { + Query rewrittenQuery = query.rewrite(indexSearcher); + /* we keep rewrite sub-query unless it's not equal to itself, it may take multiple levels of recursive calls + queries need to be rewritten from high-level clauses into lower-level clauses because low-level clauses + perform better. For hybrid query we need to track progress of re-write for all sub-queries */ + + boolean actuallyRewritten = rewrittenQuery != query; + return new AbstractMap.SimpleEntry(rewrittenQuery, actuallyRewritten); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + return null; } /** diff --git a/src/main/java/org/opensearch/neuralsearch/query/HybridQueryScorer.java b/src/main/java/org/opensearch/neuralsearch/query/HybridQueryScorer.java index 23dbd0e1d..59cc824d6 100644 --- a/src/main/java/org/opensearch/neuralsearch/query/HybridQueryScorer.java +++ b/src/main/java/org/opensearch/neuralsearch/query/HybridQueryScorer.java @@ -15,6 +15,10 @@ import org.apache.lucene.search.TwoPhaseIterator; import org.apache.lucene.search.Weight; import org.apache.lucene.util.PriorityQueue; +import org.opensearch.neuralsearch.executors.HybridQueryExecutor; +import org.opensearch.neuralsearch.executors.HybridQueryExecutorCollector; +import org.opensearch.neuralsearch.executors.HybridQueryScoresCollectionManager; +import org.opensearch.neuralsearch.executors.HybridQueryScoresCollectionManager.ScoreWrapperFromCollector; import org.opensearch.neuralsearch.search.HybridDisiWrapper; import java.io.IOException; @@ -23,6 +27,7 @@ import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.concurrent.Callable; /** * Class abstracts functionality of Scorer for hybrid query. When iterating over documents in increasing @@ -183,18 +188,38 @@ public int docID() { public float[] hybridScores() throws IOException { float[] scores = new float[numSubqueries]; DisiWrapper topList = subScorersPQ.topList(); + final HybridQueryScoresCollectionManager manager = new HybridQueryScoresCollectionManager(); + final List> scoreTasks = new ArrayList<>(); + final List> collectors = new ArrayList<>(); + for (HybridDisiWrapper disiWrapper = (HybridDisiWrapper) topList; disiWrapper != null; disiWrapper = (HybridDisiWrapper) disiWrapper.next) { // check if this doc has match in the subQuery. If not, add score as 0.0 and continue - Scorer scorer = disiWrapper.scorer; + final Scorer scorer = disiWrapper.scorer; if (scorer.docID() == DocIdSetIterator.NO_MORE_DOCS) { continue; } - scores[disiWrapper.getSubQueryIndex()] = scorer.score(); + final HybridQueryExecutorCollector collector = manager.newCollector(); + collectors.add(collector); + final int index = disiWrapper.getSubQueryIndex(); + scoreTasks.add(() -> score(scorer, index, collector)); } + HybridQueryExecutor.getExecutor().invokeAll(scoreTasks); + manager.updateScores(collectors, scores); return scores; } + private Void score(Scorer scorer, int index, HybridQueryExecutorCollector collector) { + collector.collect(unUsed -> { + try { + return ScoreWrapperFromCollector.of(index, scorer.score()); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + return null; + } + private DisiPriorityQueue initializeSubScorersPQ() { Objects.requireNonNull(subScorers, "should not be null"); // we need to count this way in order to include all identical sub-queries diff --git a/src/main/java/org/opensearch/neuralsearch/query/HybridQueryWeight.java b/src/main/java/org/opensearch/neuralsearch/query/HybridQueryWeight.java index facb79694..dc1f5e112 100644 --- a/src/main/java/org/opensearch/neuralsearch/query/HybridQueryWeight.java +++ b/src/main/java/org/opensearch/neuralsearch/query/HybridQueryWeight.java @@ -8,6 +8,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.concurrent.Callable; import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; @@ -20,6 +21,9 @@ import org.apache.lucene.search.Scorer; import org.apache.lucene.search.ScorerSupplier; import org.apache.lucene.search.Weight; +import org.opensearch.neuralsearch.executors.HybridQueryExecutor; +import org.opensearch.neuralsearch.executors.HybridQueryExecutorCollector; +import org.opensearch.neuralsearch.executors.HybridQueryScoreSupplierCollectorManager; import static org.opensearch.neuralsearch.query.HybridQueryBuilder.MAX_NUMBER_OF_SUB_QUERIES; @@ -68,20 +72,40 @@ public Matches matches(LeafReaderContext context, int doc) throws IOException { return MatchesUtils.fromSubMatches(mis); } + /** + * Returns {@link HybridScorerSupplier} which contains list of {@link ScorerSupplier} from its + * sub queries. Here, add score supplier from individual sub query is parallelized and finally + * {@link HybridScorerSupplier} is created with list of {@link ScorerSupplier} + */ @Override public ScorerSupplier scorerSupplier(LeafReaderContext context) throws IOException { - List scorerSuppliers = new ArrayList<>(); - for (Weight w : weights) { - ScorerSupplier ss = w.scorerSupplier(context); - scorerSuppliers.add(ss); + HybridQueryScoreSupplierCollectorManager manager = new HybridQueryScoreSupplierCollectorManager(context); + List> scoreSupplierTasks = new ArrayList<>(); + List> collectors = new ArrayList<>(); + for (Weight weight : weights) { + HybridQueryExecutorCollector collector = manager.newCollector(); + collectors.add(collector); + scoreSupplierTasks.add(() -> addScoreSupplier(weight, collector)); } - + HybridQueryExecutor.getExecutor().invokeAll(scoreSupplierTasks); + final List scorerSuppliers = manager.mergeScoreSuppliers(collectors); if (scorerSuppliers.isEmpty()) { return null; } return new HybridScorerSupplier(scorerSuppliers, this, scoreMode); } + private Void addScoreSupplier(Weight weight, HybridQueryExecutorCollector collector) { + collector.collect(leafReaderContext -> { + try { + return weight.scorerSupplier(leafReaderContext); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + return null; + } + /** * Create the scorer used to score our associated Query * diff --git a/src/test/java/org/opensearch/neuralsearch/executors/HybridQueryExecutorIT.java b/src/test/java/org/opensearch/neuralsearch/executors/HybridQueryExecutorIT.java new file mode 100644 index 000000000..5aaa22caa --- /dev/null +++ b/src/test/java/org/opensearch/neuralsearch/executors/HybridQueryExecutorIT.java @@ -0,0 +1,30 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.neuralsearch.executors; + +import org.apache.hc.core5.http.ParseException; +import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.junit.Assert; +import org.opensearch.client.Request; +import org.opensearch.client.Response; +import org.opensearch.neuralsearch.OpenSearchSecureRestTestCase; +import org.opensearch.rest.RestRequest; + +import java.io.IOException; + +import static org.opensearch.neuralsearch.executors.HybridQueryExecutor.getThreadPoolName; + +public class HybridQueryExecutorIT extends OpenSearchSecureRestTestCase { + + public void testHybridQueryExecutorThreadIsInitialized() throws IOException, ParseException { + final Request request = new Request(RestRequest.Method.GET.name(), String.join("/", "_cat", "thread_pool", getThreadPoolName())); + final Response response = client().performRequest(request); + assertOK(response); + + final String responseBody = EntityUtils.toString(response.getEntity()); + Assert.assertNotNull(responseBody); + Assert.assertTrue(responseBody.contains(getThreadPoolName())); + } +} diff --git a/src/test/java/org/opensearch/neuralsearch/plugin/NeuralSearchTests.java b/src/test/java/org/opensearch/neuralsearch/plugin/NeuralSearchTests.java index d0d5b82be..58a42c439 100644 --- a/src/test/java/org/opensearch/neuralsearch/plugin/NeuralSearchTests.java +++ b/src/test/java/org/opensearch/neuralsearch/plugin/NeuralSearchTests.java @@ -30,6 +30,8 @@ import org.opensearch.search.pipeline.SearchPhaseResultsProcessor; import org.opensearch.search.pipeline.SearchRequestProcessor; import org.opensearch.search.query.QueryPhaseSearcher; +import org.opensearch.threadpool.ExecutorBuilder; +import org.opensearch.threadpool.FixedExecutorBuilder; public class NeuralSearchTests extends OpenSearchQueryTestCase { @@ -105,4 +107,18 @@ public void testRequestProcessors() { assertNotNull(processors.get(NeuralQueryEnricherProcessor.TYPE)); assertNotNull(processors.get(NeuralSparseTwoPhaseProcessor.TYPE)); } + + public void testExecutionBuilders() { + NeuralSearch plugin = new NeuralSearch(); + Settings settings = Settings.builder().build(); + Environment environment = mock(Environment.class); + when(environment.settings()).thenReturn(settings); + final List> executorBuilders = plugin.getExecutorBuilders(settings); + + assertNotNull(executorBuilders); + assertFalse(executorBuilders.isEmpty()); + assertEquals("Unexpected number of executor builders are registered", 1, executorBuilders.size()); + assertTrue(executorBuilders.get(0) instanceof FixedExecutorBuilder); + } + } From 352dd45dc1d91bb39a3058ef89e142535d37e37e Mon Sep 17 00:00:00 2001 From: Vijayan Balasubramanian Date: Thu, 6 Jun 2024 14:59:36 -0700 Subject: [PATCH 2/3] Remove parallelization while collecting hybrid scores (#779) This parallelization is not adding any value after comparing the benchmarks with and without this optimization. Hence removing it. Signed-off-by: Vijayan Balasubramanian --- .../HybridQueryScoresCollectionManager.java | 54 ------------------- .../neuralsearch/query/HybridQueryScorer.java | 29 +--------- 2 files changed, 2 insertions(+), 81 deletions(-) delete mode 100644 src/main/java/org/opensearch/neuralsearch/executors/HybridQueryScoresCollectionManager.java diff --git a/src/main/java/org/opensearch/neuralsearch/executors/HybridQueryScoresCollectionManager.java b/src/main/java/org/opensearch/neuralsearch/executors/HybridQueryScoresCollectionManager.java deleted file mode 100644 index 2b8409370..000000000 --- a/src/main/java/org/opensearch/neuralsearch/executors/HybridQueryScoresCollectionManager.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ -package org.opensearch.neuralsearch.executors; - -import lombok.Data; -import lombok.NoArgsConstructor; - -import java.util.List; -import java.util.Optional; - -/** - * {@link HybridQueryScoresCollectionManager} is responsible for creating {@link HybridQueryExecutorCollector} instances. - * Useful to create {@link HybridQueryExecutorCollector} instances that calls score method on individual - * scorer - */ -@NoArgsConstructor -public final class HybridQueryScoresCollectionManager - implements - HybridQueryExecutorCollectorManager> { - - /** - * Returns new {@link HybridQueryExecutorCollector} instance to facilitate parallel execution - * by individual tasks - * @return HybridQueryExecutorCollector instance - */ - @Override - public HybridQueryExecutorCollector newCollector() { - return HybridQueryExecutorCollector.newCollector(null); - } - - /** - * Update scores from collectors that was previously collected from scorer. - * Collector will provide score and index of scorer to map it back to score array. - * This method must be called after collection is finished on all provided collectors. - * @param collectors List of scorers where we want to calculate score. - * @param scores Float array to combine scores from available scores - */ - public void updateScores(final List> collectors, final float[] scores) { - for (HybridQueryExecutorCollector collector : collectors) { - final Optional result = collector.getResult(); - if (result.isPresent()) { - scores[result.get().getIndex()] = result.get().getScore(); - } - } - } - - @Data(staticConstructor = "of") - public static class ScoreWrapperFromCollector { - private final int index; - private final float score; - } -} diff --git a/src/main/java/org/opensearch/neuralsearch/query/HybridQueryScorer.java b/src/main/java/org/opensearch/neuralsearch/query/HybridQueryScorer.java index 59cc824d6..23dbd0e1d 100644 --- a/src/main/java/org/opensearch/neuralsearch/query/HybridQueryScorer.java +++ b/src/main/java/org/opensearch/neuralsearch/query/HybridQueryScorer.java @@ -15,10 +15,6 @@ import org.apache.lucene.search.TwoPhaseIterator; import org.apache.lucene.search.Weight; import org.apache.lucene.util.PriorityQueue; -import org.opensearch.neuralsearch.executors.HybridQueryExecutor; -import org.opensearch.neuralsearch.executors.HybridQueryExecutorCollector; -import org.opensearch.neuralsearch.executors.HybridQueryScoresCollectionManager; -import org.opensearch.neuralsearch.executors.HybridQueryScoresCollectionManager.ScoreWrapperFromCollector; import org.opensearch.neuralsearch.search.HybridDisiWrapper; import java.io.IOException; @@ -27,7 +23,6 @@ import java.util.Collections; import java.util.List; import java.util.Objects; -import java.util.concurrent.Callable; /** * Class abstracts functionality of Scorer for hybrid query. When iterating over documents in increasing @@ -188,38 +183,18 @@ public int docID() { public float[] hybridScores() throws IOException { float[] scores = new float[numSubqueries]; DisiWrapper topList = subScorersPQ.topList(); - final HybridQueryScoresCollectionManager manager = new HybridQueryScoresCollectionManager(); - final List> scoreTasks = new ArrayList<>(); - final List> collectors = new ArrayList<>(); - for (HybridDisiWrapper disiWrapper = (HybridDisiWrapper) topList; disiWrapper != null; disiWrapper = (HybridDisiWrapper) disiWrapper.next) { // check if this doc has match in the subQuery. If not, add score as 0.0 and continue - final Scorer scorer = disiWrapper.scorer; + Scorer scorer = disiWrapper.scorer; if (scorer.docID() == DocIdSetIterator.NO_MORE_DOCS) { continue; } - final HybridQueryExecutorCollector collector = manager.newCollector(); - collectors.add(collector); - final int index = disiWrapper.getSubQueryIndex(); - scoreTasks.add(() -> score(scorer, index, collector)); + scores[disiWrapper.getSubQueryIndex()] = scorer.score(); } - HybridQueryExecutor.getExecutor().invokeAll(scoreTasks); - manager.updateScores(collectors, scores); return scores; } - private Void score(Scorer scorer, int index, HybridQueryExecutorCollector collector) { - collector.collect(unUsed -> { - try { - return ScoreWrapperFromCollector.of(index, scorer.score()); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); - return null; - } - private DisiPriorityQueue initializeSubScorersPQ() { Objects.requireNonNull(subScorers, "should not be null"); // we need to count this way in order to include all identical sub-queries From 7b9d4226615041b377333136774b667b346446d3 Mon Sep 17 00:00:00 2001 From: Vijayan Balasubramanian Date: Tue, 11 Jun 2024 10:50:30 -0700 Subject: [PATCH 3/3] Update execption formatting Signed-off-by: Vijayan Balasubramanian --- .../opensearch/neuralsearch/executors/HybridQueryExecutor.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/java/org/opensearch/neuralsearch/executors/HybridQueryExecutor.java b/src/main/java/org/opensearch/neuralsearch/executors/HybridQueryExecutor.java index c099daf79..b185cac8b 100644 --- a/src/main/java/org/opensearch/neuralsearch/executors/HybridQueryExecutor.java +++ b/src/main/java/org/opensearch/neuralsearch/executors/HybridQueryExecutor.java @@ -54,8 +54,7 @@ public static ExecutorBuilder getExecutorBuilder(final Settings settings) { public static void initialize(ThreadPool threadPool) { if (threadPool == null) { throw new IllegalArgumentException( - "Argument thread-pool to Hybrid Query Executor cannot be null." - + "This is required to build executor to run actions in parallel" + "Argument thread-pool to Hybrid Query Executor cannot be null. This is required to build executor to run actions in parallel" ); } taskExecutor = new TaskExecutor(threadPool.executor(HYBRID_QUERY_EXEC_THREAD_POOL_NAME));