From cb6624a142de75a2231a4f8b90cdb99b07888bd6 Mon Sep 17 00:00:00 2001 From: Harsha Vamsi Kalluri Date: Thu, 30 Jan 2025 10:42:49 -0800 Subject: [PATCH] Re-use exector from queryphaseresultconsumer Signed-off-by: Harsha Vamsi Kalluri --- .../search/QueryPhaseResultConsumer.java | 3 +- .../action/search/SearchPhaseController.java | 46 ++++++++----------- .../org/opensearch/search/SearchService.java | 4 ++ 3 files changed, 26 insertions(+), 27 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/search/QueryPhaseResultConsumer.java b/server/src/main/java/org/opensearch/action/search/QueryPhaseResultConsumer.java index 39d527f345da4..b4879c2e4b6dd 100644 --- a/server/src/main/java/org/opensearch/action/search/QueryPhaseResultConsumer.java +++ b/server/src/main/java/org/opensearch/action/search/QueryPhaseResultConsumer.java @@ -153,7 +153,8 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception { reducePhase = controller.reducedFromStream( results.asList().stream().map(r -> (StreamSearchResult) r).collect(Collectors.toList()), aggReduceContextBuilder, - performFinalReduce + performFinalReduce, + executor ); logger.info("Will reduce results for {}", results.get(0)); } else { diff --git a/server/src/main/java/org/opensearch/action/search/SearchPhaseController.java b/server/src/main/java/org/opensearch/action/search/SearchPhaseController.java index cdfb7ee1bb25e..113c6d0afa16c 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/opensearch/action/search/SearchPhaseController.java @@ -97,13 +97,11 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.IntFunction; @@ -767,32 +765,38 @@ public TicketProcessorResult call() throws Exception { public ReducedQueryPhase reducedFromStream( List list, InternalAggregation.ReduceContextBuilder aggReduceContextBuilder, - boolean performFinalReduce + boolean performFinalReduce, + Executor executor ) { try { - List tickets = list.stream() .flatMap(r -> r.getFlightTickets().stream()) .map(OSTicket::getBytes) .collect(Collectors.toList()); - int threadCount = Math.min(Runtime.getRuntime().availableProcessors(), tickets.size()); - ExecutorService executorService = Executors.newFixedThreadPool(threadCount); - - List> futures = new ArrayList<>(); + List> futures = new ArrayList<>(); int totalRows = 0; Map bucketMap = new ConcurrentHashMap<>(); List scoreDocs = new ArrayList<>(); - TotalHits totalHits = new TotalHits(totalRows, Relation.EQUAL_TO); List aggs = new ArrayList<>(); try { for (byte[] ticket : tickets) { - futures.add(executorService.submit(new TicketProcessor(ticket, streamManager))); + CompletableFuture future = CompletableFuture.supplyAsync(() -> { + try { + return new TicketProcessor(ticket, streamManager).call(); + } catch (Exception e) { + throw new CompletionException(e); + } + }, executor); + futures.add(future); } - for (Future future : futures) { + CompletableFuture allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + + allFutures.join(); + for (CompletableFuture future : futures) { TicketProcessorResult result = future.get(); totalRows += result.getRowCount(); result.getBucketMap().forEach((key, value) -> bucketMap.merge(key, value, Long::sum)); @@ -800,18 +804,10 @@ public ReducedQueryPhase reducedFromStream( } catch (InterruptedException | ExecutionException e) { Thread.currentThread().interrupt(); throw new RuntimeException("Error processing tickets in parallel", e); - } finally { - executorService.shutdown(); - try { - if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { - executorService.shutdownNow(); - } - } catch (InterruptedException e) { - executorService.shutdownNow(); - Thread.currentThread().interrupt(); - } } + TotalHits totalHits = new TotalHits(totalRows, Relation.EQUAL_TO); + List orders = new ArrayList<>(); orders.add(BucketOrder.count(false)); List buckets = new ArrayList<>(); @@ -827,6 +823,7 @@ public ReducedQueryPhase reducedFromStream( ) ); }); + aggs.add( new StringTerms( "categories", @@ -843,9 +840,6 @@ public ReducedQueryPhase reducedFromStream( ) ); - // InternalAggregations finalReduce = reduceAggs(aggReduceContextBuilder, performFinalReduce, - // List.of(InternalAggregations.from(aggs))); - return new ReducedQueryPhase( totalHits, totalRows, diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index 913be2211d759..7b3f64a9e7a93 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -954,6 +954,10 @@ private Executor getExecutor(IndexShard indexShard) { return threadPool.executor(executorName); } + public Executor getExecutor() { + return threadPool.executor(Names.SEARCH); + } + public void executeFetchPhase( InternalScrollSearchRequest request, SearchShardTask task,