Skip to content

Commit

Permalink
Re-use exector from queryphaseresultconsumer
Browse files Browse the repository at this point in the history
Signed-off-by: Harsha Vamsi Kalluri <[email protected]>
  • Loading branch information
harshavamsi committed Jan 30, 2025
1 parent 9aa70ab commit cb6624a
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
reducePhase = controller.reducedFromStream(
results.asList().stream().map(r -> (StreamSearchResult) r).collect(Collectors.toList()),
aggReduceContextBuilder,
performFinalReduce
performFinalReduce,
executor
);
logger.info("Will reduce results for {}", results.get(0));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,11 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntFunction;
Expand Down Expand Up @@ -767,51 +765,49 @@ public TicketProcessorResult call() throws Exception {
public ReducedQueryPhase reducedFromStream(
List<StreamSearchResult> list,
InternalAggregation.ReduceContextBuilder aggReduceContextBuilder,
boolean performFinalReduce
boolean performFinalReduce,
Executor executor
) {
try {

List<byte[]> tickets = list.stream()
.flatMap(r -> r.getFlightTickets().stream())
.map(OSTicket::getBytes)
.collect(Collectors.toList());

int threadCount = Math.min(Runtime.getRuntime().availableProcessors(), tickets.size());
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);

List<Future<TicketProcessorResult>> futures = new ArrayList<>();
List<CompletableFuture<TicketProcessorResult>> futures = new ArrayList<>();
int totalRows = 0;
Map<String, Long> bucketMap = new ConcurrentHashMap<>();

List<ScoreDoc> scoreDocs = new ArrayList<>();
TotalHits totalHits = new TotalHits(totalRows, Relation.EQUAL_TO);
List<InternalAggregation> aggs = new ArrayList<>();

try {
for (byte[] ticket : tickets) {
futures.add(executorService.submit(new TicketProcessor(ticket, streamManager)));
CompletableFuture<TicketProcessorResult> future = CompletableFuture.supplyAsync(() -> {
try {
return new TicketProcessor(ticket, streamManager).call();
} catch (Exception e) {
throw new CompletionException(e);
}
}, executor);
futures.add(future);
}

for (Future<TicketProcessorResult> future : futures) {
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

allFutures.join();
for (CompletableFuture<TicketProcessorResult> future : futures) {
TicketProcessorResult result = future.get();
totalRows += result.getRowCount();
result.getBucketMap().forEach((key, value) -> bucketMap.merge(key, value, Long::sum));
}
} catch (InterruptedException | ExecutionException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Error processing tickets in parallel", e);
} finally {
executorService.shutdown();
try {
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}

TotalHits totalHits = new TotalHits(totalRows, Relation.EQUAL_TO);

List<BucketOrder> orders = new ArrayList<>();
orders.add(BucketOrder.count(false));
List<StringTerms.Bucket> buckets = new ArrayList<>();
Expand All @@ -827,6 +823,7 @@ public ReducedQueryPhase reducedFromStream(
)
);
});

aggs.add(
new StringTerms(
"categories",
Expand All @@ -843,9 +840,6 @@ public ReducedQueryPhase reducedFromStream(
)
);

// InternalAggregations finalReduce = reduceAggs(aggReduceContextBuilder, performFinalReduce,
// List.of(InternalAggregations.from(aggs)));

return new ReducedQueryPhase(
totalHits,
totalRows,
Expand Down
4 changes: 4 additions & 0 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -954,6 +954,10 @@ private Executor getExecutor(IndexShard indexShard) {
return threadPool.executor(executorName);
}

public Executor getExecutor() {
return threadPool.executor(Names.SEARCH);
}

public void executeFetchPhase(
InternalScrollSearchRequest request,
SearchShardTask task,
Expand Down

0 comments on commit cb6624a

Please sign in to comment.