From cecdd96d84c494c17bc5de87a5a6e1114fe6d9b0 Mon Sep 17 00:00:00 2001 From: sahil <61558528+buddharajusahil@users.noreply.github.com> Date: Fri, 30 Jun 2023 14:02:18 -0700 Subject: [PATCH 01/45] CoordinatorStats Signed-off-by: sahil buddharaju --- .idea/runConfigurations/Debug_OpenSearch.xml | 18 +- .../search/stats/SearchStatsIT.java | 11 +- .../opensearch/action/CoordinatorStats.java | 30 ++ .../admin/indices/stats/CommonStats.java | 8 + .../search/AbstractSearchAsyncAction.java | 83 +++- .../action/search/SearchCoordinatorStats.java | 208 +++++++++ .../opensearch/action/search/SearchPhase.java | 9 + .../action/search/SearchPhaseContext.java | 2 + .../SearchRequestOperationsListener.java | 273 ++++++++++++ .../action/search/TransportSearchAction.java | 47 ++- .../common/metrics/CounterMetric.java | 2 + .../opensearch/common/metrics/MeanMetric.java | 1 + .../index/search/stats/SearchStats.java | 157 +++++++ .../opensearch/indices/IndicesService.java | 9 +- .../opensearch/indices/NodeIndicesStats.java | 5 +- .../main/java/org/opensearch/node/Node.java | 8 +- .../AbstractSearchAsyncActionTests.java | 350 ++++++++++++++++ .../action/search/MockSearchPhaseContext.java | 5 + .../search/SearchCoordinatorStatsTests.java | 128 ++++++ .../SearchQueryThenFetchAsyncActionTests.java | 4 +- .../SearchRequestOperationsListenerTests.java | 395 ++++++++++++++++++ .../index/search/stats/SearchStatsTests.java | 43 ++ .../indices/NodeIndicesStatsTests.java | 2 +- .../snapshots/SnapshotResiliencyTests.java | 6 +- 24 files changed, 1763 insertions(+), 41 deletions(-) create mode 100644 server/src/main/java/org/opensearch/action/CoordinatorStats.java create mode 100644 server/src/main/java/org/opensearch/action/search/SearchCoordinatorStats.java create mode 100644 server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java create mode 100644 server/src/test/java/org/opensearch/action/search/SearchCoordinatorStatsTests.java create mode 100644 server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerTests.java diff --git a/.idea/runConfigurations/Debug_OpenSearch.xml b/.idea/runConfigurations/Debug_OpenSearch.xml index 0d8bf59823acf..fddcf47728460 100644 --- a/.idea/runConfigurations/Debug_OpenSearch.xml +++ b/.idea/runConfigurations/Debug_OpenSearch.xml @@ -1,11 +1,11 @@ - - + + diff --git a/server/src/internalClusterTest/java/org/opensearch/search/stats/SearchStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/search/stats/SearchStatsIT.java index c72b5d40553b3..e5edc1b93e0a8 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/stats/SearchStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/stats/SearchStatsIT.java @@ -165,8 +165,17 @@ public void testSimpleStats() throws Exception { Set nodeIdsWithIndex = nodeIdsWithIndex("test1", "test2"); int num = 0; + int coordNumber = 0; + for (NodeStats stat : nodeStats.getNodes()) { Stats total = stat.getIndices().getSearch().getTotal(); + if (total.getCoordinatorStatsLongHolder().queryMetric > 0) { + assertThat(total.getCoordinatorStatsLongHolder().queryTotal, greaterThan(0L)); + assertThat(total.getCoordinatorStatsLongHolder().fetchMetric, greaterThan(0L)); + assertThat(total.getCoordinatorStatsLongHolder().fetchTotal, greaterThan(0L)); + assertThat(total.getCoordinatorStatsLongHolder().expandSearchTotal, greaterThan(0L)); + coordNumber += 1; + } if (nodeIdsWithIndex.contains(stat.getNode().getId())) { assertThat(total.getQueryCount(), greaterThan(0L)); assertThat(total.getQueryTimeInMillis(), greaterThan(0L)); @@ -176,7 +185,7 @@ public void testSimpleStats() throws Exception { assertThat(total.getQueryTimeInMillis(), equalTo(0L)); } } - + assertThat(coordNumber, greaterThan(0)); assertThat(num, greaterThan(0)); } diff --git a/server/src/main/java/org/opensearch/action/CoordinatorStats.java b/server/src/main/java/org/opensearch/action/CoordinatorStats.java new file mode 100644 index 0000000000000..443a2649faefb --- /dev/null +++ b/server/src/main/java/org/opensearch/action/CoordinatorStats.java @@ -0,0 +1,30 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action; + +import org.opensearch.action.search.SearchCoordinatorStats; + +/** + * Cordinator level stats + * + * @opensearch.internal + */ +public final class CoordinatorStats { + + public SearchCoordinatorStats searchCoordinatorStats; + + public CoordinatorStats() { + searchCoordinatorStats = new SearchCoordinatorStats(); + } + + public SearchCoordinatorStats getSearchCoordinatorStats() { + return searchCoordinatorStats; + } + +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStats.java b/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStats.java index e4abaef4ddfa8..435febbdc2211 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStats.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStats.java @@ -33,6 +33,7 @@ package org.opensearch.action.admin.indices.stats; import org.apache.lucene.store.AlreadyClosedException; +import org.opensearch.action.CoordinatorStats; import org.opensearch.common.Nullable; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -284,6 +285,13 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(recoveryStats); } + // Add all Coordinator Stats to the Search Stats from here + public void addCoordinatorStats(CoordinatorStats coordinatorStats) { + if (coordinatorStats.getSearchCoordinatorStats() != null && this.search != null) { + search.setSearchCoordinatorStats(coordinatorStats.getSearchCoordinatorStats()); + } + } + public void add(CommonStats stats) { if (docs == null) { if (stats.getDocs() != null) { diff --git a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java index 152e6f56668f2..595796319d2d8 100644 --- a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java @@ -68,6 +68,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; @@ -107,7 +108,6 @@ abstract class AbstractSearchAsyncAction exten private final AtomicInteger skippedOps = new AtomicInteger(); private final TransportSearchAction.SearchTimeProvider timeProvider; private final SearchResponse.Clusters clusters; - protected final GroupShardsIterator toSkipShardsIts; protected final GroupShardsIterator shardsIts; private final int expectedTotalOps; @@ -116,8 +116,13 @@ abstract class AbstractSearchAsyncAction exten private final Map pendingExecutionsPerNode = new ConcurrentHashMap<>(); private final boolean throttleConcurrentRequests; + private SearchPhase currentPhase; + private final List releasables = new ArrayList<>(); + private SearchRequestOperationsListener searchRequestOperationsListener; + private List searchListenersList; + AbstractSearchAsyncAction( String name, Logger logger, @@ -171,6 +176,12 @@ abstract class AbstractSearchAsyncAction exten this.indexRoutings = indexRoutings; this.results = resultConsumer; this.clusters = clusters; + + } + + public void setSearchListenerList(List searchListenersList) { + this.searchListenersList = searchListenersList; + this.searchRequestOperationsListener = new SearchRequestOperationsListener.CompositeListener(this.searchListenersList, logger); } @Override @@ -371,6 +382,7 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha : OpenSearchException.guessRootCauses(shardSearchFailures[0].getCause())[0]; logger.debug(() -> new ParameterizedMessage("All shards failed for phase: [{}]", getName()), cause); onPhaseFailure(currentPhase, "all shards failed", cause); + } else { Boolean allowPartialResults = request.allowPartialSearchResults(); assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults"; @@ -419,12 +431,56 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha clusterState.version() ); } + onPhaseEnd(this); executePhase(nextPhase); } } + public void onPhaseEnd(SearchPhaseContext searchPhaseContext) { + if (searchRequestOperationsListener == null) { + return; + } + long tookTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.getCurrentPhase().getStartTime()); + + if (searchPhaseContext.getCurrentPhase() instanceof SearchDfsQueryThenFetchAsyncAction) { + searchRequestOperationsListener.onDFSPreQueryPhaseEnd(searchPhaseContext, tookTimeInMillis); + } else if (searchPhaseContext.getCurrentPhase() instanceof CanMatchPreFilterSearchPhase) { + searchRequestOperationsListener.onCanMatchPhaseEnd(searchPhaseContext, tookTimeInMillis); + } else if (searchPhaseContext.getCurrentPhase() instanceof DfsQueryPhase) { + searchRequestOperationsListener.onQueryPhaseEnd(searchPhaseContext, tookTimeInMillis); + }else if (searchPhaseContext.getCurrentPhase() instanceof SearchQueryThenFetchAsyncAction) { + searchRequestOperationsListener.onQueryPhaseEnd(searchPhaseContext, tookTimeInMillis); + } else if (searchPhaseContext.getCurrentPhase() instanceof FetchSearchPhase) { + searchRequestOperationsListener.onFetchPhaseEnd(searchPhaseContext, tookTimeInMillis); + } else if (searchPhaseContext.getCurrentPhase() instanceof ExpandSearchPhase) { + searchRequestOperationsListener.onExpandSearchPhaseEnd(searchPhaseContext, tookTimeInMillis); + } + } + + public void onPhaseStart(SearchPhase phase, SearchPhaseContext searchPhaseContext) { + setCurrentPhase(phase); + phase.setStartTimeInNanos(System.nanoTime()); + if (searchRequestOperationsListener == null) { + return; + } + if (searchPhaseContext.getCurrentPhase() instanceof SearchDfsQueryThenFetchAsyncAction) { + searchRequestOperationsListener.onDFSPreQueryPhaseStart(searchPhaseContext); + } else if (searchPhaseContext.getCurrentPhase() instanceof CanMatchPreFilterSearchPhase) { + searchRequestOperationsListener.onCanMatchPhaseStart(searchPhaseContext); + } else if (searchPhaseContext.getCurrentPhase() instanceof DfsQueryPhase) { + searchRequestOperationsListener.onQueryPhaseStart(searchPhaseContext); + } else if (searchPhaseContext.getCurrentPhase() instanceof SearchQueryThenFetchAsyncAction) { + searchRequestOperationsListener.onQueryPhaseStart(searchPhaseContext); + } else if (searchPhaseContext.getCurrentPhase() instanceof FetchSearchPhase) { + searchRequestOperationsListener.onFetchPhaseStart(searchPhaseContext); + } else if (searchPhaseContext.getCurrentPhase() instanceof ExpandSearchPhase) { + searchRequestOperationsListener.onExpandSearchPhaseStart(searchPhaseContext); + } + } + private void executePhase(SearchPhase phase) { try { + onPhaseStart(phase, this); phase.run(); } catch (Exception e) { if (logger.isDebugEnabled()) { @@ -603,6 +659,14 @@ private void successfulShardExecution(SearchShardIterator shardsIt) { } } + public SearchPhase getCurrentPhase() { + return currentPhase; + } + + public void setCurrentPhase(SearchPhase phase) { + currentPhase = phase; + } + @Override public final int getNumShards() { return results.getNumShards(); @@ -670,10 +734,27 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At } listener.onResponse(buildSearchResponse(internalSearchResponse, failures, scrollId, searchContextId)); } + onPhaseEnd(this); + setCurrentPhase(null); } @Override public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) { + if (searchRequestOperationsListener != null) { + if (this.currentPhase instanceof SearchDfsQueryThenFetchAsyncAction) { + searchRequestOperationsListener.onDFSPreQueryPhaseFailure(this); + } else if (this.currentPhase instanceof CanMatchPreFilterSearchPhase) { + searchRequestOperationsListener.onCanMatchPhaseFailure(this); + } else if (this.currentPhase instanceof DfsQueryPhase) { + searchRequestOperationsListener.onQueryPhaseFailure(this); + } else if (this.currentPhase instanceof SearchQueryThenFetchAsyncAction) { + searchRequestOperationsListener.onQueryPhaseFailure(this); + } else if (this.currentPhase instanceof FetchSearchPhase) { + searchRequestOperationsListener.onFetchPhaseFailure(this); + } else if (this.currentPhase instanceof ExpandSearchPhase) { + searchRequestOperationsListener.onExpandSearchPhaseFailure(this); + } + } raisePhaseFailure(new SearchPhaseExecutionException(phase.getName(), msg, cause, buildShardFailures())); } diff --git a/server/src/main/java/org/opensearch/action/search/SearchCoordinatorStats.java b/server/src/main/java/org/opensearch/action/search/SearchCoordinatorStats.java new file mode 100644 index 0000000000000..66556f5587174 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/SearchCoordinatorStats.java @@ -0,0 +1,208 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.common.inject.Inject; +import org.opensearch.common.metrics.CounterMetric; +import org.opensearch.common.metrics.MeanMetric; + +import java.util.function.Consumer; + +/** + * Coordinator level search stats + * + * @opensearch.internal + */ +public final class SearchCoordinatorStats implements SearchRequestOperationsListener { + public StatsHolder totalStats = new StatsHolder(); + + // private final CounterMetric openContexts = new CounterMetric(); + + // private volatile Map groupStats = emptyMap(); + + @Inject + public SearchCoordinatorStats() {} + + public long getDFSPreQueryMetric() { + return totalStats.dfsPreQueryMetric.sum(); + } + + public long getDFSPreQueryCurrent() { + return totalStats.dfsPreQueryCurrent.count(); + } + + public long getDFSPreQueryTotal() { + return totalStats.dfsPreQueryTotal.count(); + } + + public long getCanMatchMetric() { + return totalStats.canMatchMetric.sum(); + } + + public long getCanMatchCurrent() { + return totalStats.canMatchCurrent.count(); + } + + public long getCanMatchTotal() { + return totalStats.canMatchTotal.count(); + } + + public long getQueryMetric() { + return totalStats.queryMetric.sum(); + } + + public long getQueryCurrent() { + return totalStats.queryCurrent.count(); + } + + public long getQueryTotal() { + return totalStats.queryTotal.count(); + } + + public long getFetchMetric() { + return totalStats.fetchMetric.sum(); + } + + public long getFetchCurrent() { + return totalStats.fetchCurrent.count(); + } + + public long getFetchTotal() { + return totalStats.fetchTotal.count(); + } + + public long getExpandSearchMetric() { + return totalStats.expandSearchMetric.sum(); + } + + public long getExpandSearchCurrent() { + return totalStats.expandSearchCurrent.count(); + } + + public long getExpandSearchTotal() { + return totalStats.expandSearchTotal.count(); + } + + private void computeStats(SearchPhaseContext searchPhaseContext, Consumer consumer) { + consumer.accept(totalStats); + } + + @Override + public void onDFSPreQueryPhaseStart(SearchPhaseContext context) { + computeStats(context, statsHolder -> { statsHolder.dfsPreQueryCurrent.inc(); }); + } + + @Override + public void onDFSPreQueryPhaseEnd(SearchPhaseContext context, long tookTime) { + computeStats(context, statsHolder -> { + totalStats.dfsPreQueryCurrent.dec(); + totalStats.dfsPreQueryTotal.inc(); + totalStats.dfsPreQueryMetric.inc(tookTime); + }); + } + + @Override + public void onDFSPreQueryPhaseFailure(SearchPhaseContext context) { + return; + } + + @Override + public void onCanMatchPhaseStart(SearchPhaseContext context) { + computeStats(context, statsHolder -> { statsHolder.canMatchCurrent.inc(); }); + } + + @Override + public void onCanMatchPhaseEnd(SearchPhaseContext context, long tookTime) { + computeStats(context, statsHolder -> { + totalStats.canMatchCurrent.dec(); + totalStats.canMatchTotal.inc(); + totalStats.canMatchMetric.inc(tookTime); + }); + } + + @Override + public void onCanMatchPhaseFailure(SearchPhaseContext context) { + return; + } + + @Override + public void onQueryPhaseStart(SearchPhaseContext context) { + computeStats(context, statsHolder -> { statsHolder.queryCurrent.inc(); }); + } + + @Override + public void onQueryPhaseEnd(SearchPhaseContext context, long tookTime) { + computeStats(context, statsHolder -> { + totalStats.queryCurrent.dec(); + totalStats.queryTotal.inc(); + totalStats.queryMetric.inc(tookTime); + }); + } + + @Override + public void onQueryPhaseFailure(SearchPhaseContext context) { + return; + } + + @Override + public void onFetchPhaseStart(SearchPhaseContext context) { + computeStats(context, statsHolder -> { totalStats.fetchCurrent.inc(); }); + } + + @Override + public void onFetchPhaseEnd(SearchPhaseContext context, long tookTime) { + computeStats(context, statsHolder -> { + totalStats.fetchCurrent.dec(); + totalStats.fetchTotal.inc(); + totalStats.fetchMetric.inc(tookTime); + }); + } + + @Override + public void onFetchPhaseFailure(SearchPhaseContext context) { + return; + } + + @Override + public void onExpandSearchPhaseStart(SearchPhaseContext context) { + computeStats(context, statsHolder -> { totalStats.expandSearchCurrent.inc(); }); + } + + @Override + public void onExpandSearchPhaseEnd(SearchPhaseContext context, long tookTime) { + computeStats(context, statsHolder -> { + totalStats.expandSearchCurrent.dec(); + totalStats.expandSearchTotal.inc(); + totalStats.expandSearchMetric.inc(tookTime); + }); + } + + @Override + public void onExpandSearchPhaseFailure(SearchPhaseContext context) { + return; + } + + public static final class StatsHolder { + public MeanMetric dfsPreQueryMetric = new MeanMetric(); + public CounterMetric dfsPreQueryCurrent = new CounterMetric(); + public CounterMetric dfsPreQueryTotal = new CounterMetric(); + public MeanMetric canMatchMetric = new MeanMetric(); + public CounterMetric canMatchCurrent = new CounterMetric(); + public CounterMetric canMatchTotal = new CounterMetric(); + public MeanMetric queryMetric = new MeanMetric(); + public CounterMetric queryCurrent = new CounterMetric(); + public CounterMetric queryTotal = new CounterMetric(); + public MeanMetric fetchMetric = new MeanMetric(); + public CounterMetric fetchCurrent = new CounterMetric(); + public CounterMetric fetchTotal = new CounterMetric(); + public MeanMetric expandSearchMetric = new MeanMetric(); + public CounterMetric expandSearchCurrent = new CounterMetric(); + public CounterMetric expandSearchTotal = new CounterMetric(); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/SearchPhase.java b/server/src/main/java/org/opensearch/action/search/SearchPhase.java index 50b0cd8e01c1d..7f833ac378152 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchPhase.java +++ b/server/src/main/java/org/opensearch/action/search/SearchPhase.java @@ -44,11 +44,20 @@ */ abstract class SearchPhase implements CheckedRunnable { private final String name; + private long startTime; protected SearchPhase(String name) { this.name = Objects.requireNonNull(name, "name must not be null"); } + public void setStartTimeInNanos(long startTime) { + this.startTime = startTime; + } + + public long getStartTime() { + return startTime; + } + /** * Returns the phases name. */ diff --git a/server/src/main/java/org/opensearch/action/search/SearchPhaseContext.java b/server/src/main/java/org/opensearch/action/search/SearchPhaseContext.java index 018035f21179b..7a3ba8efa379b 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchPhaseContext.java +++ b/server/src/main/java/org/opensearch/action/search/SearchPhaseContext.java @@ -73,6 +73,8 @@ public interface SearchPhaseContext extends Executor { */ SearchRequest getRequest(); + SearchPhase getCurrentPhase(); + /** * Builds and sends the final search response back to the user. * diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java new file mode 100644 index 0000000000000..35014d08b26b1 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java @@ -0,0 +1,273 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * A listener for search, fetch and context events at the coordinator node level + * + * @opensearch.internal + */ +public interface SearchRequestOperationsListener { + + /** + * Executed when the request is started + * @param context the current searchPhase context + */ + // void onRequestStart(SearchPhaseContext context); + + /** + * Executed when the request is ended + * @param context the current searchPhase context + */ + // void onRequestEnd(SearchPhaseContext context); + + /** + * Executed when the query phase is started + */ + void onDFSPreQueryPhaseStart(SearchPhaseContext context); + + void onDFSPreQueryPhaseFailure(SearchPhaseContext context); + + void onDFSPreQueryPhaseEnd(SearchPhaseContext context, long tookTime); + + void onCanMatchPhaseStart(SearchPhaseContext context); + + void onCanMatchPhaseFailure(SearchPhaseContext context); + + void onCanMatchPhaseEnd(SearchPhaseContext context, long tookTime); + + void onQueryPhaseStart(SearchPhaseContext context); + + void onQueryPhaseFailure(SearchPhaseContext context); + + void onQueryPhaseEnd(SearchPhaseContext context, long tookTime); + + void onFetchPhaseStart(SearchPhaseContext context); + + void onFetchPhaseFailure(SearchPhaseContext context); + + void onFetchPhaseEnd(SearchPhaseContext context, long tookTime); + + void onExpandSearchPhaseStart(SearchPhaseContext context); + + void onExpandSearchPhaseFailure(SearchPhaseContext context); + + void onExpandSearchPhaseEnd(SearchPhaseContext context, long tookTime); + + final class CompositeListener implements SearchRequestOperationsListener { + private final List listeners; + private final Logger logger; + private long canMatchPhaseStart; + private long canMatchPhaseEnd; + private long dfsPreQueryPhaseStart; + private long dfsPreQueryPhaseEnd; + private long queryPhaseStart; + private long queryPhaseEnd; + private long fetchPhaseStart; + private long fetchPhaseEnd; + private long expandSearchPhaseStart; + private long expandSearchPhaseEnd; + private long dfsPreQueryTotal; + private long canMatchTotal; + private long queryTotal; + private long fetchTotal; + private long expandSearchTotal; + + public CompositeListener(List listeners, Logger logger) { + this.listeners = listeners; + this.logger = logger; + } + + @Override + public void onDFSPreQueryPhaseStart(SearchPhaseContext context) { + for (SearchRequestOperationsListener listener : listeners) { + try { + dfsPreQueryPhaseStart = System.nanoTime(); + listener.onDFSPreQueryPhaseStart(context); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("onPhaseStart listener [{}] failed", listener), e); + } + } + } + + @Override + public void onDFSPreQueryPhaseEnd(SearchPhaseContext context, long tookTime) { + for (SearchRequestOperationsListener listener : listeners) { + try { + dfsPreQueryPhaseEnd = System.nanoTime(); + dfsPreQueryTotal = TimeUnit.NANOSECONDS.toMillis(dfsPreQueryPhaseEnd - dfsPreQueryPhaseStart); + listener.onDFSPreQueryPhaseEnd(context, tookTime); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("onPhaseEnd listener [{}] failed", listener), e); + } + } + } + + @Override + public void onDFSPreQueryPhaseFailure(SearchPhaseContext context) { + for (SearchRequestOperationsListener listener : listeners) { + try { + listener.onDFSPreQueryPhaseFailure(context); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("onPhaseFailure listener [{}] failed", listener), e); + } + } + } + + @Override + public void onCanMatchPhaseStart(SearchPhaseContext context) { + for (SearchRequestOperationsListener listener : listeners) { + try { + canMatchPhaseStart = System.nanoTime(); + listener.onCanMatchPhaseStart(context); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("onPhaseStart listener [{}] failed", listener), e); + } + } + } + + @Override + public void onCanMatchPhaseEnd(SearchPhaseContext context, long tookTime) { + for (SearchRequestOperationsListener listener : listeners) { + try { + canMatchPhaseEnd = System.nanoTime(); + canMatchTotal = TimeUnit.NANOSECONDS.toMillis(canMatchPhaseEnd - canMatchPhaseStart); + listener.onCanMatchPhaseEnd(context, tookTime); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("onPhaseEnd listener [{}] failed", listener), e); + } + } + } + + @Override + public void onCanMatchPhaseFailure(SearchPhaseContext context) { + for (SearchRequestOperationsListener listener : listeners) { + try { + listener.onCanMatchPhaseFailure(context); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("onPhaseFailure listener [{}] failed", listener), e); + } + } + } + + @Override + public void onQueryPhaseStart(SearchPhaseContext context) { + for (SearchRequestOperationsListener listener : listeners) { + try { + queryPhaseStart = System.nanoTime(); + listener.onQueryPhaseStart(context); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("onPhaseStart listener [{}] failed", listener), e); + } + } + } + + @Override + public void onQueryPhaseEnd(SearchPhaseContext context, long tookTime) { + for (SearchRequestOperationsListener listener : listeners) { + try { + queryPhaseEnd = System.nanoTime(); + queryTotal = TimeUnit.NANOSECONDS.toMillis(queryPhaseEnd - queryPhaseStart); + listener.onQueryPhaseEnd(context, tookTime); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("onPhaseEnd listener [{}] failed", listener), e); + } + } + } + + @Override + public void onQueryPhaseFailure(SearchPhaseContext context) { + for (SearchRequestOperationsListener listener : listeners) { + try { + listener.onQueryPhaseFailure(context); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("onPhaseFailure listener [{}] failed", listener), e); + } + } + } + + @Override + public void onFetchPhaseStart(SearchPhaseContext context) { + for (SearchRequestOperationsListener listener : listeners) { + try { + fetchPhaseStart = System.nanoTime(); + listener.onFetchPhaseStart(context); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("onFetchStart listener [{}] failed", listener), e); + } + } + } + + @Override + public void onFetchPhaseEnd(SearchPhaseContext context, long tookTime) { + for (SearchRequestOperationsListener listener : listeners) { + try { + fetchPhaseEnd = System.nanoTime(); + fetchTotal = TimeUnit.NANOSECONDS.toMillis(fetchPhaseEnd - fetchPhaseStart); + listener.onFetchPhaseEnd(context, tookTime); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("onFetchEnd listener [{}] failed", listener), e); + } + } + } + + @Override + public void onFetchPhaseFailure(SearchPhaseContext context) { + for (SearchRequestOperationsListener listener : listeners) { + try { + listener.onFetchPhaseFailure(context); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("onFetchFailure listener [{}] failed", listener), e); + } + } + } + + @Override + public void onExpandSearchPhaseStart(SearchPhaseContext context) { + for (SearchRequestOperationsListener listener : listeners) { + try { + expandSearchPhaseStart = System.nanoTime(); + listener.onExpandSearchPhaseStart(context); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("onExpandSearchStart listener [{}] failed", listener), e); + } + } + } + + @Override + public void onExpandSearchPhaseEnd(SearchPhaseContext context, long tookTime) { + for (SearchRequestOperationsListener listener : listeners) { + try { + expandSearchPhaseEnd = System.nanoTime(); + expandSearchTotal = TimeUnit.NANOSECONDS.toMillis(expandSearchPhaseEnd - expandSearchPhaseStart); + listener.onExpandSearchPhaseEnd(context, tookTime); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("onExpandSearchEnd listener [{}] failed", listener), e); + } + } + } + + @Override + public void onExpandSearchPhaseFailure(SearchPhaseContext context) { + for (SearchRequestOperationsListener listener : listeners) { + try { + listener.onExpandSearchPhaseFailure(context); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("onExpandSearchFailure listener [{}] failed", listener), e); + } + } + } + } +} diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 1011f17c98dd6..907c9b4cabc90 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -156,6 +156,7 @@ public class TransportSearchAction extends HandledTransportAction searchListenersList = new ArrayList<>(); @Inject public TransportSearchAction( @@ -170,7 +171,8 @@ public TransportSearchAction( ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, NamedWriteableRegistry namedWriteableRegistry, - SearchPipelineService searchPipelineService + SearchPipelineService searchPipelineService, + SearchCoordinatorStats searchCoordinatorStats ) { super(SearchAction.NAME, transportService, actionFilters, (Writeable.Reader) SearchRequest::new); this.client = client; @@ -185,6 +187,7 @@ public TransportSearchAction( this.indexNameExpressionResolver = indexNameExpressionResolver; this.namedWriteableRegistry = namedWriteableRegistry; this.searchPipelineService = searchPipelineService; + this.searchListenersList.add(searchCoordinatorStats); } private Map buildPerIndexAliasFilter( @@ -329,25 +332,26 @@ public AbstractSearchAsyncAction asyncSearchAction( ThreadPool threadPool, SearchResponse.Clusters clusters ) { - return new AbstractSearchAsyncAction( - actionName, - logger, - searchTransportService, - connectionLookup, - aliasFilter, - concreteIndexBoosts, - indexRoutings, - executor, - searchRequest, - listener, - shardsIts, - timeProvider, - clusterState, - task, - new ArraySearchPhaseResults<>(shardsIts.size()), - searchRequest.getMaxConcurrentShardRequests(), - clusters - ) { + AbstractSearchAsyncAction returnAbstractSearchAsyncAction = new AbstractSearchAsyncAction< + SearchPhaseResult>( + actionName, + logger, + searchTransportService, + connectionLookup, + aliasFilter, + concreteIndexBoosts, + indexRoutings, + executor, + searchRequest, + listener, + shardsIts, + timeProvider, + clusterState, + task, + new ArraySearchPhaseResults<>(shardsIts.size()), + searchRequest.getMaxConcurrentShardRequests(), + clusters + ) { @Override protected void executePhaseOnShard( SearchShardIterator shardIt, @@ -374,6 +378,8 @@ boolean buildPointInTimeFromSearchResults() { return includeSearchContext; } }; + returnAbstractSearchAsyncAction.setSearchListenerList(searchListenersList); + return returnAbstractSearchAsyncAction; } }, listener); } @@ -1218,6 +1224,7 @@ public void run() { default: throw new IllegalStateException("Unknown search type: [" + searchRequest.searchType() + "]"); } + searchAsyncAction.setSearchListenerList(searchListenersList); return searchAsyncAction; } } diff --git a/server/src/main/java/org/opensearch/common/metrics/CounterMetric.java b/server/src/main/java/org/opensearch/common/metrics/CounterMetric.java index 5c48c1f772ff0..a20b30dd7d27b 100644 --- a/server/src/main/java/org/opensearch/common/metrics/CounterMetric.java +++ b/server/src/main/java/org/opensearch/common/metrics/CounterMetric.java @@ -62,4 +62,6 @@ public void dec(long n) { public long count() { return counter.sum(); } + + } diff --git a/server/src/main/java/org/opensearch/common/metrics/MeanMetric.java b/server/src/main/java/org/opensearch/common/metrics/MeanMetric.java index 33f12c8cb42d3..359facdce633b 100644 --- a/server/src/main/java/org/opensearch/common/metrics/MeanMetric.java +++ b/server/src/main/java/org/opensearch/common/metrics/MeanMetric.java @@ -79,4 +79,5 @@ public void clear() { counter.reset(); sum.reset(); } + } diff --git a/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java b/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java index b5781de7cbd62..150368c629ef8 100644 --- a/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java +++ b/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java @@ -33,6 +33,8 @@ package org.opensearch.index.search.stats; import org.opensearch.Version; +import org.opensearch.action.CoordinatorStats; +import org.opensearch.action.search.SearchCoordinatorStats; import org.opensearch.common.Nullable; import org.opensearch.core.common.Strings; import org.opensearch.core.common.io.stream.StreamInput; @@ -61,6 +63,24 @@ public class SearchStats implements Writeable, ToXContentFragment { * * @opensearch.internal */ + + public static class CoordinatorStatsLongHolder { + public long dfsPreQueryMetric; + public long dfsPreQueryCurrent; + public long dfsPreQueryTotal; + public long canMatchMetric; + public long canMatchCurrent; + public long canMatchTotal; + public long queryMetric; + public long queryCurrent; + public long queryTotal; + public long fetchMetric; + public long fetchCurrent; + public long fetchTotal; + public long expandSearchMetric; + public long expandSearchCurrent; + public long expandSearchTotal; + } public static class Stats implements Writeable, ToXContentFragment { private long queryCount; @@ -83,6 +103,16 @@ public static class Stats implements Writeable, ToXContentFragment { private long pitTimeInMillis; private long pitCurrent; + @Nullable + public CoordinatorStatsLongHolder coordinatorStatsLongHolder; + + public CoordinatorStatsLongHolder getCoordinatorStatsLongHolder() { + if (coordinatorStatsLongHolder == null) { + return null; + } + return coordinatorStatsLongHolder; + } + private Stats() { // for internal use, initializes all counts to 0 } @@ -104,6 +134,7 @@ public Stats( long suggestTimeInMillis, long suggestCurrent ) { + this.coordinatorStatsLongHolder = new CoordinatorStatsLongHolder(); this.queryCount = queryCount; this.queryTimeInMillis = queryTimeInMillis; this.queryCurrent = queryCurrent; @@ -147,6 +178,29 @@ private Stats(StreamInput in) throws IOException { pitTimeInMillis = in.readVLong(); pitCurrent = in.readVLong(); } + + if (in.getVersion().onOrAfter(Version.V_2_0_0)) { + this.coordinatorStatsLongHolder = new CoordinatorStatsLongHolder(); + coordinatorStatsLongHolder.dfsPreQueryMetric = in.readVLong(); + coordinatorStatsLongHolder.dfsPreQueryCurrent= in.readVLong(); + coordinatorStatsLongHolder.dfsPreQueryTotal = in.readVLong(); + + coordinatorStatsLongHolder.canMatchMetric = in.readVLong(); + coordinatorStatsLongHolder.canMatchCurrent= in.readVLong(); + coordinatorStatsLongHolder.canMatchTotal = in.readVLong(); + + coordinatorStatsLongHolder.queryMetric = in.readVLong(); + coordinatorStatsLongHolder.queryCurrent= in.readVLong(); + coordinatorStatsLongHolder.queryTotal = in.readVLong(); + + coordinatorStatsLongHolder.fetchMetric = in.readVLong(); + coordinatorStatsLongHolder.fetchCurrent= in.readVLong(); + coordinatorStatsLongHolder.fetchTotal = in.readVLong(); + + coordinatorStatsLongHolder.expandSearchMetric = in.readVLong(); + coordinatorStatsLongHolder.expandSearchCurrent= in.readVLong(); + coordinatorStatsLongHolder.expandSearchTotal = in.readVLong(); + } } public void add(Stats stats) { @@ -298,6 +352,28 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(pitTimeInMillis); out.writeVLong(pitCurrent); } + + if (out.getVersion().onOrAfter(Version.V_2_0_0)) { + out.writeVLong(coordinatorStatsLongHolder.dfsPreQueryMetric); + out.writeVLong(coordinatorStatsLongHolder.dfsPreQueryCurrent); + out.writeVLong(coordinatorStatsLongHolder.dfsPreQueryTotal); + + out.writeVLong(coordinatorStatsLongHolder.canMatchMetric); + out.writeVLong(coordinatorStatsLongHolder.canMatchCurrent); + out.writeVLong(coordinatorStatsLongHolder.canMatchTotal); + + out.writeVLong(coordinatorStatsLongHolder.queryMetric); + out.writeVLong(coordinatorStatsLongHolder.queryCurrent); + out.writeVLong(coordinatorStatsLongHolder.queryTotal); + + out.writeVLong(coordinatorStatsLongHolder.fetchMetric); + out.writeVLong(coordinatorStatsLongHolder.fetchCurrent); + out.writeVLong(coordinatorStatsLongHolder.fetchTotal); + + out.writeVLong(coordinatorStatsLongHolder.expandSearchMetric); + out.writeVLong(coordinatorStatsLongHolder.expandSearchCurrent); + out.writeVLong(coordinatorStatsLongHolder.expandSearchTotal); + } } @Override @@ -322,6 +398,51 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.humanReadableField(Fields.SUGGEST_TIME_IN_MILLIS, Fields.SUGGEST_TIME, getSuggestTime()); builder.field(Fields.SUGGEST_CURRENT, suggestCurrent); + if (coordinatorStatsLongHolder != null) { + builder.startObject(Fields.COORDINATOR); + + builder.humanReadableField( + Fields.DFS_PREQUERY_TIME_IN_MILLIS, + Fields.QUERY_TIME, + new TimeValue(coordinatorStatsLongHolder.dfsPreQueryMetric) + ); + builder.field(Fields.DFS_PREQUERY_CURRENT, coordinatorStatsLongHolder.dfsPreQueryCurrent); + builder.field(Fields.DFS_PREQUERY_TOTAL, coordinatorStatsLongHolder.dfsPreQueryTotal); + + builder.humanReadableField( + Fields.CANMATCH_TIME_IN_MILLIS, + Fields.QUERY_TIME, + new TimeValue(coordinatorStatsLongHolder.canMatchMetric) + ); + builder.field(Fields.CANMATCH_CURRENT, coordinatorStatsLongHolder.canMatchCurrent); + builder.field(Fields.CANMATCH_TOTAL, coordinatorStatsLongHolder.canMatchTotal); + + builder.humanReadableField( + Fields.QUERY_TIME_IN_MILLIS, + Fields.QUERY_TIME, + new TimeValue(coordinatorStatsLongHolder.queryMetric) + ); + builder.field(Fields.QUERY_CURRENT, coordinatorStatsLongHolder.queryCurrent); + builder.field(Fields.QUERY_TOTAL, coordinatorStatsLongHolder.queryTotal); + + builder.humanReadableField( + Fields.FETCH_TIME_IN_MILLIS, + Fields.FETCH_TIME, + new TimeValue(coordinatorStatsLongHolder.fetchMetric) + ); + builder.field(Fields.FETCH_CURRENT, coordinatorStatsLongHolder.fetchCurrent); + builder.field(Fields.FETCH_TOTAL, coordinatorStatsLongHolder.fetchTotal); + + builder.humanReadableField( + Fields.EXPAND_TIME_IN_MILLIS, + Fields.FETCH_TIME, + new TimeValue(coordinatorStatsLongHolder.expandSearchMetric) + ); + builder.field(Fields.EXPAND_CURRENT, coordinatorStatsLongHolder.expandSearchCurrent); + builder.field(Fields.EXPAND_TOTAL, coordinatorStatsLongHolder.expandSearchTotal); + + builder.endObject(); + } return builder; } } @@ -336,6 +457,32 @@ public SearchStats() { totalStats = new Stats(); } + // Set the different Coordinator Stats fields in here + public void setSearchCoordinatorStats(SearchCoordinatorStats searchCoordinatorStats) { + if (totalStats.coordinatorStatsLongHolder == null) { + totalStats.coordinatorStatsLongHolder = new CoordinatorStatsLongHolder(); + } + totalStats.coordinatorStatsLongHolder.dfsPreQueryMetric = searchCoordinatorStats.getDFSPreQueryMetric(); + totalStats.coordinatorStatsLongHolder.dfsPreQueryCurrent = searchCoordinatorStats.getDFSPreQueryCurrent(); + totalStats.coordinatorStatsLongHolder.dfsPreQueryTotal = searchCoordinatorStats.getDFSPreQueryTotal(); + + totalStats.coordinatorStatsLongHolder.canMatchMetric = searchCoordinatorStats.getCanMatchMetric(); + totalStats.coordinatorStatsLongHolder.canMatchCurrent = searchCoordinatorStats.getCanMatchCurrent(); + totalStats.coordinatorStatsLongHolder.canMatchTotal = searchCoordinatorStats.getCanMatchTotal(); + + totalStats.coordinatorStatsLongHolder.queryMetric = searchCoordinatorStats.getQueryMetric(); + totalStats.coordinatorStatsLongHolder.queryCurrent = searchCoordinatorStats.getQueryCurrent(); + totalStats.coordinatorStatsLongHolder.queryTotal = searchCoordinatorStats.getQueryTotal(); + + totalStats.coordinatorStatsLongHolder.fetchMetric = searchCoordinatorStats.getFetchMetric(); + totalStats.coordinatorStatsLongHolder.fetchCurrent = searchCoordinatorStats.getFetchCurrent(); + totalStats.coordinatorStatsLongHolder.fetchTotal = searchCoordinatorStats.getFetchTotal(); + + totalStats.coordinatorStatsLongHolder.expandSearchMetric = searchCoordinatorStats.getExpandSearchMetric(); + totalStats.coordinatorStatsLongHolder.expandSearchCurrent = searchCoordinatorStats.getExpandSearchCurrent(); + totalStats.coordinatorStatsLongHolder.expandSearchTotal = searchCoordinatorStats.getExpandSearchTotal(); + } + public SearchStats(Stats totalStats, long openContexts, @Nullable Map groupStats) { this.totalStats = totalStats; this.openContexts = openContexts; @@ -446,6 +593,16 @@ static final class Fields { static final String SUGGEST_TIME = "suggest_time"; static final String SUGGEST_TIME_IN_MILLIS = "suggest_time_in_millis"; static final String SUGGEST_CURRENT = "suggest_current"; + static final String COORDINATOR = "coordinator"; + static final String DFS_PREQUERY_TIME_IN_MILLIS = "dfs_prequery_time_in_millis"; + static final String DFS_PREQUERY_CURRENT = "dfs_prequery_current"; + static final String DFS_PREQUERY_TOTAL = "dfs_prequery_total"; + static final String CANMATCH_TIME_IN_MILLIS = "canmatch_time_in_millis"; + static final String CANMATCH_CURRENT = "canmatch_current"; + static final String CANMATCH_TOTAL = "canmatch_total"; + static final String EXPAND_TIME_IN_MILLIS = "expand_time_in_millis"; + static final String EXPAND_CURRENT = "expand_current"; + static final String EXPAND_TOTAL = "expand_total"; } @Override diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 844f4fec706f3..02a156da41a73 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -42,6 +42,7 @@ import org.apache.lucene.util.RamUsageEstimator; import org.opensearch.OpenSearchException; import org.opensearch.ResourceAlreadyExistsException; +import org.opensearch.action.CoordinatorStats; import org.opensearch.action.admin.indices.stats.CommonStats; import org.opensearch.action.admin.indices.stats.CommonStatsFlags; import org.opensearch.action.admin.indices.stats.CommonStatsFlags.Flag; @@ -321,6 +322,8 @@ public class IndicesService extends AbstractLifecycleComponent private final FileCacheCleaner fileCacheCleaner; + private final CoordinatorStats coordinatorStats; + @Override protected void doStart() { // Start thread that will manage cleaning the field data cache periodically @@ -350,7 +353,8 @@ public IndicesService( Map recoveryStateFactories, IndexStorePlugin.DirectoryFactory remoteDirectoryFactory, Supplier repositoriesServiceSupplier, - FileCacheCleaner fileCacheCleaner + FileCacheCleaner fileCacheCleaner, + CoordinatorStats coordinatorStats ) { this.settings = settings; this.threadPool = threadPool; @@ -440,6 +444,7 @@ protected void closeInternal() { clusterService.getClusterSettings().addSettingsUpdateConsumer(ALLOW_EXPENSIVE_QUERIES, this::setAllowExpensiveQueries); this.remoteDirectoryFactory = remoteDirectoryFactory; this.translogFactorySupplier = getTranslogFactorySupplier(repositoriesServiceSupplier, threadPool); + this.coordinatorStats = coordinatorStats; } private static BiFunction getTranslogFactorySupplier( @@ -539,7 +544,7 @@ public NodeIndicesStats stats(CommonStatsFlags flags) { } } - return new NodeIndicesStats(commonStats, statsByShard(this, flags)); + return new NodeIndicesStats(commonStats, statsByShard(this, flags), coordinatorStats); } Map> statsByShard(final IndicesService indicesService, final CommonStatsFlags flags) { diff --git a/server/src/main/java/org/opensearch/indices/NodeIndicesStats.java b/server/src/main/java/org/opensearch/indices/NodeIndicesStats.java index 482cd07543051..7278b591f3762 100644 --- a/server/src/main/java/org/opensearch/indices/NodeIndicesStats.java +++ b/server/src/main/java/org/opensearch/indices/NodeIndicesStats.java @@ -32,6 +32,7 @@ package org.opensearch.indices; +import org.opensearch.action.CoordinatorStats; import org.opensearch.action.admin.indices.stats.CommonStats; import org.opensearch.action.admin.indices.stats.IndexShardStats; import org.opensearch.action.admin.indices.stats.ShardStats; @@ -71,7 +72,6 @@ * @opensearch.internal */ public class NodeIndicesStats implements Writeable, ToXContentFragment { - private CommonStats stats; private Map> statsByShard; @@ -92,7 +92,7 @@ public NodeIndicesStats(StreamInput in) throws IOException { } } - public NodeIndicesStats(CommonStats oldStats, Map> statsByShard) { + public NodeIndicesStats(CommonStats oldStats, Map> statsByShard, CoordinatorStats coordinatorStats) { // this.stats = stats; this.statsByShard = statsByShard; @@ -105,6 +105,7 @@ public NodeIndicesStats(CommonStats oldStats, Map> } } } + this.stats.addCoordinatorStats(coordinatorStats); } @Nullable diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 60c95a04a042f..ddcb515dd4172 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -36,6 +36,8 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.util.Constants; import org.opensearch.ExceptionsHelper; +import org.opensearch.action.CoordinatorStats; +import org.opensearch.action.search.SearchCoordinatorStats; import org.opensearch.common.SetOnce; import org.opensearch.common.settings.SettingsException; import org.opensearch.core.common.unit.ByteSizeUnit; @@ -721,6 +723,8 @@ protected Node( threadPool ); + final CoordinatorStats coordinatorStats = new CoordinatorStats(); + final IndicesService indicesService = new IndicesService( settings, pluginsService, @@ -744,7 +748,8 @@ protected Node( recoveryStateFactories, remoteDirectoryFactory, repositoriesServiceReference::get, - fileCacheCleaner + fileCacheCleaner, + coordinatorStats ); final AliasValidator aliasValidator = new AliasValidator(); @@ -1147,6 +1152,7 @@ protected Node( b.bind(SystemIndices.class).toInstance(systemIndices); b.bind(IdentityService.class).toInstance(identityService); b.bind(Tracer.class).toInstance(tracer); + b.bind(SearchCoordinatorStats.class).toInstance(coordinatorStats.getSearchCoordinatorStats()); }); injector = modules.createInjector(); diff --git a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java index d906c7be15a15..41e257d1b7840 100644 --- a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java @@ -34,26 +34,38 @@ import org.junit.After; import org.junit.Before; +import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.action.OriginalIndices; import org.opensearch.action.support.IndicesOptions; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.GroupShardsIterator; import org.opensearch.common.UUIDs; +import org.opensearch.core.common.breaker.CircuitBreaker; +import org.opensearch.core.common.breaker.NoopCircuitBreaker; import org.opensearch.common.collect.Tuple; +import org.opensearch.common.util.concurrent.AtomicArray; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.util.set.Sets; import org.opensearch.core.index.Index; +import org.opensearch.core.common.Strings; import org.opensearch.index.query.MatchAllQueryBuilder; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.shard.ShardNotFoundException; import org.opensearch.search.SearchPhaseResult; import org.opensearch.search.SearchShardTarget; +import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.internal.AliasFilter; import org.opensearch.search.internal.InternalSearchResponse; import org.opensearch.search.internal.ShardSearchContextId; import org.opensearch.search.internal.ShardSearchRequest; import org.opensearch.search.query.QuerySearchResult; +import org.opensearch.search.sort.SortBuilders; +import org.opensearch.test.InternalAggregationTestCase; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.Transport; import java.util.ArrayList; @@ -61,14 +73,18 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; @@ -83,12 +99,14 @@ public class AbstractSearchAsyncActionTests extends OpenSearchTestCase { private final List> resolvedNodes = new ArrayList<>(); private final Set releasedContexts = new CopyOnWriteArraySet<>(); private ExecutorService executor; + ThreadPool threadPool; @Before @Override public void setUp() throws Exception { super.setUp(); executor = Executors.newFixedThreadPool(1); + threadPool = new TestThreadPool(getClass().getName()); } @After @@ -97,6 +115,7 @@ public void tearDown() throws Exception { super.tearDown(); executor.shutdown(); assertTrue(executor.awaitTermination(1, TimeUnit.SECONDS)); + ThreadPool.terminate(threadPool, 5, TimeUnit.SECONDS); } private AbstractSearchAsyncAction createAction( @@ -126,6 +145,7 @@ private AbstractSearchAsyncAction createAction( final AtomicLong expected, final SearchShardIterator... shards ) { + final Runnable runnable; final TransportSearchAction.SearchTimeProvider timeProvider; if (controlled) { @@ -528,9 +548,339 @@ public void onFailure(Exception e) { assertThat(searchResponse.getSuccessfulShards(), equalTo(shards.length)); } + public void testSearchRequestListeners() { + SearchCoordinatorStatsTesting searchCoordinatorStatsTesting = new SearchCoordinatorStatsTesting(); + SearchRequestOperationsListener testListener = createSearchRequestOperationsListener(searchCoordinatorStatsTesting); + final List requestOperationListeners = new ArrayList<>(Arrays.asList(testListener, testListener)); + + SearchQueryThenFetchAsyncAction action = createSearchQueryThenFetchAsyncAction(); + action.setSearchListenerList(requestOperationListeners); + + SearchDfsQueryThenFetchAsyncAction searchDfsQueryThenFetchAsyncAction = createSearchDfsQueryThenFetchAsyncAction(); + searchDfsQueryThenFetchAsyncAction.setSearchListenerList(requestOperationListeners); + + CanMatchPreFilterSearchPhase canMatchPreFilterSearchPhaseAction = createCanMatchPreFilterSearchPhase(); + canMatchPreFilterSearchPhaseAction.setSearchListenerList(requestOperationListeners); + + action.start(); + assertEquals(2, searchCoordinatorStatsTesting.queryPhaseStart.get()); + + FetchSearchPhase fetchPhase = createFetchSearchPhase(); + ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomInt()); + SearchShardIterator searchShardIterator = new SearchShardIterator(null, shardId, Collections.emptyList(), OriginalIndices.NONE); + searchShardIterator.resetAndSkip(); + action.skipShard(searchShardIterator); + action.executeNextPhase(action, fetchPhase); + + assertEquals(2, searchCoordinatorStatsTesting.queryPhaseEnd.get()); + + searchDfsQueryThenFetchAsyncAction.start(); + searchDfsQueryThenFetchAsyncAction.skipShard(searchShardIterator); + searchDfsQueryThenFetchAsyncAction.executeNextPhase(searchDfsQueryThenFetchAsyncAction, fetchPhase); + + canMatchPreFilterSearchPhaseAction.start(); + + assertEquals(2, searchCoordinatorStatsTesting.dfsPreQueryPhaseStart.get()); + assertEquals(2, searchCoordinatorStatsTesting.dfsPreQueryPhaseEnd.get()); + assertEquals(2, searchCoordinatorStatsTesting.canMatchPhaseStart.get()); + assertEquals(4, searchCoordinatorStatsTesting.fetchPhaseStart.get()); + + ExpandSearchPhase expandPhase = createExpandSearchPhase(); + action.executeNextPhase(fetchPhase, expandPhase); + assertEquals(2, searchCoordinatorStatsTesting.expandPhaseStart.get()); + + action.sendSearchResponse(InternalSearchResponse.empty(), null); + } + + private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAction() { + SearchPhaseController controller = new SearchPhaseController( + writableRegistry(), + r -> InternalAggregationTestCase.emptyReduceContextBuilder() + ); + SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true); + SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()); + Executor executor = OpenSearchExecutors.newDirectExecutorService(); + SearchShardIterator shards = new SearchShardIterator(null, null, Collections.emptyList(), null); + GroupShardsIterator shardsIter = new GroupShardsIterator<>(List.of(shards)); + QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer( + searchRequest, + executor, + new NoopCircuitBreaker(CircuitBreaker.REQUEST), + controller, + task.getProgressListener(), + writableRegistry(), + shardsIter.size(), + exc -> {} + ); + return new SearchDfsQueryThenFetchAsyncAction( + logger, + null, + null, + null, + null, + null, + null, + executor, + resultConsumer, + searchRequest, + null, + shardsIter, + null, + null, + task, + SearchResponse.Clusters.EMPTY + ); + } + + private SearchQueryThenFetchAsyncAction createSearchQueryThenFetchAsyncAction() { + SearchPhaseController controller = new SearchPhaseController( + writableRegistry(), + r -> InternalAggregationTestCase.emptyReduceContextBuilder() + ); + SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true); + SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()); + Executor executor = OpenSearchExecutors.newDirectExecutorService(); + SearchShardIterator shards = new SearchShardIterator(null, null, Collections.emptyList(), null); + GroupShardsIterator shardsIter = new GroupShardsIterator<>(List.of(shards)); + QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer( + searchRequest, + executor, + new NoopCircuitBreaker(CircuitBreaker.REQUEST), + controller, + task.getProgressListener(), + writableRegistry(), + shardsIter.size(), + exc -> {} + ); + return new SearchQueryThenFetchAsyncAction( + logger, + null, + null, + null, + null, + null, + null, + executor, + resultConsumer, + searchRequest, + null, + shardsIter, + null, + null, + task, + SearchResponse.Clusters.EMPTY + ) { + @Override + ShardSearchFailure[] buildShardFailures() { + return ShardSearchFailure.EMPTY_ARRAY; + } + + @Override + public void sendSearchResponse(InternalSearchResponse internalSearchResponse, AtomicArray queryResults) { + onPhaseEnd(this); + setCurrentPhase(null); + } + }; + } + + private CanMatchPreFilterSearchPhase createCanMatchPreFilterSearchPhase() { + final TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider( + 0, + System.nanoTime(), + System::nanoTime + ); + SearchTransportService searchTransportService = new SearchTransportService(null, null); + Map lookup = new ConcurrentHashMap<>(); + final SearchRequest searchRequest = new SearchRequest(); + searchRequest.allowPartialSearchResults(false); + + int numConcurrent = randomIntBetween(1, 4); + searchRequest.setMaxConcurrentShardRequests(numConcurrent); + searchRequest.setBatchedReduceSize(2); + searchRequest.source(new SearchSourceBuilder().size(1).sort(SortBuilders.fieldSort("timestamp"))); + int numShards = 1; + DiscoveryNode primaryNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNode replicaNode = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT); + GroupShardsIterator shardsIter = SearchAsyncActionTests.getShardsIter( + "idx", + new OriginalIndices(new String[] { "idx" }, SearchRequest.DEFAULT_INDICES_OPTIONS), + numShards, + randomBoolean(), + primaryNode, + replicaNode + ); + return new CanMatchPreFilterSearchPhase( + logger, + searchTransportService, + (clusterAlias, node) -> lookup.get(node), + Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)), + Collections.emptyMap(), + Collections.emptyMap(), + OpenSearchExecutors.newDirectExecutorService(), + searchRequest, + null, + shardsIter, + timeProvider, + ClusterState.EMPTY_STATE, + null, + null, + SearchResponse.Clusters.EMPTY + ); + } + + private FetchSearchPhase createFetchSearchPhase() { + SearchPhaseController controller = new SearchPhaseController( + writableRegistry(), + r -> InternalAggregationTestCase.emptyReduceContextBuilder() + ); + MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(1); + QueryPhaseResultConsumer results = controller.newSearchPhaseResults( + OpenSearchExecutors.newDirectExecutorService(), + new NoopCircuitBreaker(CircuitBreaker.REQUEST), + SearchProgressListener.NOOP, + mockSearchPhaseContext.getRequest(), + 1, + exc -> {} + ); + return new FetchSearchPhase( + results, + controller, + null, + mockSearchPhaseContext, + (searchResponse, scrollId) -> new SearchPhase("test") { + @Override + public void run() { + mockSearchPhaseContext.sendSearchResponse(searchResponse, null); + } + } + ); + } + + private ExpandSearchPhase createExpandSearchPhase() { + MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(1); + InternalSearchResponse internalSearchResponse = new InternalSearchResponse(null, null, null, null, false, null, 1); + return new ExpandSearchPhase(mockSearchPhaseContext, internalSearchResponse, null); + } + + private SearchRequestOperationsListener createSearchRequestOperationsListener( + SearchCoordinatorStatsTesting searchCoordinatorStatsTesting + ) { + return new SearchRequestOperationsListener() { + @Override + public void onDFSPreQueryPhaseStart(SearchPhaseContext context) { + assertNotNull(context); + searchCoordinatorStatsTesting.dfsPreQueryPhaseStart.incrementAndGet(); + } + + @Override + public void onDFSPreQueryPhaseFailure(SearchPhaseContext context) { + assertNotNull(context); + searchCoordinatorStatsTesting.dfsPreQueryPhaseFailure.incrementAndGet(); + } + + @Override + public void onDFSPreQueryPhaseEnd(SearchPhaseContext context, long tookTime) { + assertNotNull(context); + searchCoordinatorStatsTesting.dfsPreQueryPhaseEnd.incrementAndGet(); + } + + @Override + public void onCanMatchPhaseStart(SearchPhaseContext context) { + assertNotNull(context); + searchCoordinatorStatsTesting.canMatchPhaseStart.incrementAndGet(); + } + + @Override + public void onCanMatchPhaseFailure(SearchPhaseContext context) { + assertNotNull(context); + searchCoordinatorStatsTesting.canMatchPhaseFailure.incrementAndGet(); + } + + @Override + public void onCanMatchPhaseEnd(SearchPhaseContext context, long tookTime) { + assertNotNull(context); + searchCoordinatorStatsTesting.canMatchPhaseEnd.incrementAndGet(); + } + + @Override + public void onQueryPhaseStart(SearchPhaseContext context) { + assertNotNull(context); + searchCoordinatorStatsTesting.queryPhaseStart.incrementAndGet(); + } + + @Override + public void onQueryPhaseFailure(SearchPhaseContext context) { + assertNotNull(context); + searchCoordinatorStatsTesting.queryPhaseFailure.incrementAndGet(); + } + + @Override + public void onQueryPhaseEnd(SearchPhaseContext context, long tookTime) { + assertNotNull(context); + searchCoordinatorStatsTesting.queryPhaseEnd.incrementAndGet(); + } + + @Override + public void onFetchPhaseStart(SearchPhaseContext context) { + assertNotNull(context); + searchCoordinatorStatsTesting.fetchPhaseStart.incrementAndGet(); + } + + @Override + public void onFetchPhaseFailure(SearchPhaseContext context) { + assertNotNull(context); + searchCoordinatorStatsTesting.fetchPhaseFailure.incrementAndGet(); + } + + @Override + public void onFetchPhaseEnd(SearchPhaseContext context, long tookTime) { + assertNotNull(context); + searchCoordinatorStatsTesting.fetchPhaseEnd.incrementAndGet(); + } + + @Override + public void onExpandSearchPhaseStart(SearchPhaseContext context) { + assertNotNull(context); + searchCoordinatorStatsTesting.expandPhaseStart.incrementAndGet(); + } + + @Override + public void onExpandSearchPhaseFailure(SearchPhaseContext context) { + assertNotNull(context); + searchCoordinatorStatsTesting.expandPhaseFailure.incrementAndGet(); + } + + @Override + public void onExpandSearchPhaseEnd(SearchPhaseContext context, long tookTime) { + assertNotNull(context); + searchCoordinatorStatsTesting.expandPhaseEnd.incrementAndGet(); + } + }; + } + private static final class PhaseResult extends SearchPhaseResult { PhaseResult(ShardSearchContextId contextId) { this.contextId = contextId; } } + + private static final class SearchCoordinatorStatsTesting { + public AtomicInteger dfsPreQueryPhaseStart = new AtomicInteger(); + public AtomicInteger dfsPreQueryPhaseFailure = new AtomicInteger(); + public AtomicInteger dfsPreQueryPhaseEnd = new AtomicInteger(); + public AtomicInteger canMatchPhaseStart = new AtomicInteger(); + public AtomicInteger canMatchPhaseFailure = new AtomicInteger(); + public AtomicInteger canMatchPhaseEnd = new AtomicInteger(); + public AtomicInteger queryPhaseStart = new AtomicInteger(); + public AtomicInteger queryPhaseFailure = new AtomicInteger(); + public AtomicInteger queryPhaseEnd = new AtomicInteger(); + public AtomicInteger fetchPhaseStart = new AtomicInteger(); + public AtomicInteger fetchPhaseFailure = new AtomicInteger(); + public AtomicInteger fetchPhaseEnd = new AtomicInteger(); + public AtomicInteger expandPhaseStart = new AtomicInteger(); + public AtomicInteger expandPhaseFailure = new AtomicInteger(); + public AtomicInteger expandPhaseEnd = new AtomicInteger(); + + public SearchCoordinatorStatsTesting() {}; + } } diff --git a/server/src/test/java/org/opensearch/action/search/MockSearchPhaseContext.java b/server/src/test/java/org/opensearch/action/search/MockSearchPhaseContext.java index f5a705c0e1033..db7b8736b1b0c 100644 --- a/server/src/test/java/org/opensearch/action/search/MockSearchPhaseContext.java +++ b/server/src/test/java/org/opensearch/action/search/MockSearchPhaseContext.java @@ -99,6 +99,11 @@ public SearchRequest getRequest() { return searchRequest; } + @Override + public SearchPhase getCurrentPhase() { + return null; + } + @Override public void sendSearchResponse(InternalSearchResponse internalSearchResponse, AtomicArray queryResults) { String scrollId = getRequest().scroll() != null ? TransportSearchHelper.buildScrollId(queryResults, Version.CURRENT) : null; diff --git a/server/src/test/java/org/opensearch/action/search/SearchCoordinatorStatsTests.java b/server/src/test/java/org/opensearch/action/search/SearchCoordinatorStatsTests.java new file mode 100644 index 0000000000000..17e93f5ce16cf --- /dev/null +++ b/server/src/test/java/org/opensearch/action/search/SearchCoordinatorStatsTests.java @@ -0,0 +1,128 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.test.OpenSearchTestCase; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Phaser; + +public class SearchCoordinatorStatsTests extends OpenSearchTestCase { + public void testSearchCoordinatorStats() { + SearchCoordinatorStats testCoordinatorStats = new SearchCoordinatorStats(); + + SearchPhaseContext ctx = new MockSearchPhaseContext(1); + long tookTime = 10; + + testCoordinatorStats.onDFSPreQueryPhaseStart(ctx); + assertEquals(1, testCoordinatorStats.getDFSPreQueryCurrent()); + + testCoordinatorStats.onDFSPreQueryPhaseEnd(ctx, 10); + assertEquals(0, testCoordinatorStats.getDFSPreQueryCurrent()); + assertEquals(1, testCoordinatorStats.getDFSPreQueryTotal()); + assertEquals(tookTime, testCoordinatorStats.getDFSPreQueryMetric()); + + testCoordinatorStats.onCanMatchPhaseStart(ctx); + assertEquals(1, testCoordinatorStats.getCanMatchCurrent()); + + testCoordinatorStats.onCanMatchPhaseEnd(ctx, 10); + assertEquals(0, testCoordinatorStats.getCanMatchCurrent()); + assertEquals(1, testCoordinatorStats.getCanMatchTotal()); + assertEquals(tookTime, testCoordinatorStats.getCanMatchMetric()); + + testCoordinatorStats.onQueryPhaseStart(ctx); + assertEquals(1, testCoordinatorStats.getQueryCurrent()); + + testCoordinatorStats.onQueryPhaseEnd(ctx, 10); + assertEquals(0, testCoordinatorStats.getQueryCurrent()); + assertEquals(1, testCoordinatorStats.getQueryTotal()); + assertEquals(tookTime, testCoordinatorStats.getQueryMetric()); + + testCoordinatorStats.onFetchPhaseStart(ctx); + assertEquals(1, testCoordinatorStats.getFetchCurrent()); + + testCoordinatorStats.onFetchPhaseEnd(ctx, 10); + assertEquals(0, testCoordinatorStats.getFetchCurrent()); + assertEquals(1, testCoordinatorStats.getFetchTotal()); + assertEquals(tookTime, testCoordinatorStats.getFetchMetric()); + + testCoordinatorStats.onExpandSearchPhaseStart(ctx); + assertEquals(1, testCoordinatorStats.getExpandSearchCurrent()); + + testCoordinatorStats.onExpandSearchPhaseEnd(ctx, 10); + assertEquals(0, testCoordinatorStats.getExpandSearchCurrent()); + assertEquals(1, testCoordinatorStats.getExpandSearchTotal()); + assertEquals(tookTime, testCoordinatorStats.getExpandSearchMetric()); + } + + public void testSearchCoordinatorStatsMulti() throws InterruptedException { + SearchCoordinatorStats testCoordinatorStats = new SearchCoordinatorStats(); + SearchPhaseContext ctx = new MockSearchPhaseContext(1); + long tookTime = 10; + int numTasks = randomIntBetween(5, 50); + + Thread[] threads = new Thread[numTasks]; + + Phaser phaser = new Phaser(numTasks); + + CountDownLatch countDownLatch = new CountDownLatch(numTasks); + for (int i = 0; i < numTasks; i++) { + threads[i] = new Thread(() -> { + phaser.arriveAndAwaitAdvance(); + testCoordinatorStats.onDFSPreQueryPhaseStart(ctx); + + phaser.arriveAndAwaitAdvance(); + testCoordinatorStats.onDFSPreQueryPhaseEnd(ctx, tookTime); + + phaser.arriveAndAwaitAdvance(); + testCoordinatorStats.onCanMatchPhaseStart(ctx); + + phaser.arriveAndAwaitAdvance(); + testCoordinatorStats.onCanMatchPhaseEnd(ctx, tookTime); + + phaser.arriveAndAwaitAdvance(); + testCoordinatorStats.onQueryPhaseStart(ctx); + + phaser.arriveAndAwaitAdvance(); + testCoordinatorStats.onQueryPhaseEnd(ctx, tookTime); + + phaser.arriveAndAwaitAdvance(); + testCoordinatorStats.onFetchPhaseStart(ctx); + + phaser.arriveAndAwaitAdvance(); + testCoordinatorStats.onFetchPhaseEnd(ctx, tookTime); + + phaser.arriveAndAwaitAdvance(); + testCoordinatorStats.onExpandSearchPhaseStart(ctx); + + phaser.arriveAndAwaitAdvance(); + testCoordinatorStats.onExpandSearchPhaseEnd(ctx, tookTime); + countDownLatch.countDown(); + }); + threads[i].start(); + } + + countDownLatch.await(); + + assertEquals(numTasks, testCoordinatorStats.getDFSPreQueryTotal()); + assertEquals(numTasks * tookTime, testCoordinatorStats.getDFSPreQueryMetric()); + + assertEquals(numTasks, testCoordinatorStats.getCanMatchTotal()); + assertEquals(numTasks * tookTime, testCoordinatorStats.getCanMatchMetric()); + + assertEquals(numTasks, testCoordinatorStats.getQueryTotal()); + assertEquals(numTasks * tookTime, testCoordinatorStats.getQueryMetric()); + + assertEquals(numTasks, testCoordinatorStats.getFetchTotal()); + assertEquals(numTasks * tookTime, testCoordinatorStats.getFetchMetric()); + + assertEquals(numTasks, testCoordinatorStats.getExpandSearchTotal()); + assertEquals(numTasks * tookTime, testCoordinatorStats.getExpandSearchMetric()); + } +} diff --git a/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java index d0a1307d33235..9e8af654d70aa 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java @@ -62,8 +62,7 @@ import org.opensearch.test.InternalAggregationTestCase; import org.opensearch.transport.Transport; -import java.util.Collections; -import java.util.Map; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; @@ -226,6 +225,7 @@ public void run() { }; } }; + action.start(); latch.await(); assertThat(successfulOps.get(), equalTo(numShards)); diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerTests.java new file mode 100644 index 0000000000000..6039645b1ae32 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerTests.java @@ -0,0 +1,395 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.test.OpenSearchTestCase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +public class SearchRequestOperationsListenerTests extends OpenSearchTestCase { + + public void testListenersAreExecuted() { + AtomicInteger dfsPreQueryPhaseStart = new AtomicInteger(); + AtomicInteger dfsPreQueryPhaseFailure = new AtomicInteger(); + AtomicInteger dfsPreQueryPhaseEnd = new AtomicInteger(); + AtomicInteger canMatchPhaseStart = new AtomicInteger(); + AtomicInteger canMatchPhaseFailure = new AtomicInteger(); + AtomicInteger canMatchPhaseEnd = new AtomicInteger(); + AtomicInteger queryPhaseStart = new AtomicInteger(); + AtomicInteger queryPhaseFailure = new AtomicInteger(); + AtomicInteger queryPhaseEnd = new AtomicInteger(); + AtomicInteger fetchPhaseStart = new AtomicInteger(); + AtomicInteger fetchPhaseFailure = new AtomicInteger(); + AtomicInteger fetchPhaseEnd = new AtomicInteger(); + AtomicInteger expandPhaseStart = new AtomicInteger(); + AtomicInteger expandPhaseFailure = new AtomicInteger(); + AtomicInteger expandPhaseEnd = new AtomicInteger(); + AtomicInteger timeInNanos = new AtomicInteger(randomIntBetween(0, 10)); + + SearchRequestOperationsListener testListener = new SearchRequestOperationsListener() { + @Override + public void onDFSPreQueryPhaseStart(SearchPhaseContext context) { + assertNotNull(context); + dfsPreQueryPhaseStart.incrementAndGet(); + } + + @Override + public void onDFSPreQueryPhaseFailure(SearchPhaseContext context) { + assertNotNull(context); + dfsPreQueryPhaseFailure.incrementAndGet(); + } + + @Override + public void onDFSPreQueryPhaseEnd(SearchPhaseContext context, long tookTime) { + assertEquals(timeInNanos.get(), tookTime); + assertNotNull(context); + dfsPreQueryPhaseEnd.incrementAndGet(); + } + + @Override + public void onCanMatchPhaseStart(SearchPhaseContext context) { + assertNotNull(context); + canMatchPhaseStart.incrementAndGet(); + } + + @Override + public void onCanMatchPhaseFailure(SearchPhaseContext context) { + assertNotNull(context); + canMatchPhaseFailure.incrementAndGet(); + } + + @Override + public void onCanMatchPhaseEnd(SearchPhaseContext context, long tookTime) { + assertNotNull(context); + canMatchPhaseEnd.incrementAndGet(); + } + + @Override + public void onQueryPhaseStart(SearchPhaseContext context) { + assertNotNull(context); + queryPhaseStart.incrementAndGet(); + } + + @Override + public void onQueryPhaseFailure(SearchPhaseContext context) { + assertNotNull(context); + queryPhaseFailure.incrementAndGet(); + } + + @Override + public void onQueryPhaseEnd(SearchPhaseContext context, long tookTime) { + assertNotNull(context); + queryPhaseEnd.incrementAndGet(); + } + + @Override + public void onFetchPhaseStart(SearchPhaseContext context) { + assertNotNull(context); + fetchPhaseStart.incrementAndGet(); + } + + @Override + public void onFetchPhaseFailure(SearchPhaseContext context) { + assertNotNull(context); + fetchPhaseFailure.incrementAndGet(); + } + + @Override + public void onFetchPhaseEnd(SearchPhaseContext context, long tookTime) { + assertNotNull(context); + fetchPhaseEnd.incrementAndGet(); + } + + @Override + public void onExpandSearchPhaseStart(SearchPhaseContext context) { + assertNotNull(context); + expandPhaseStart.incrementAndGet(); + } + + @Override + public void onExpandSearchPhaseFailure(SearchPhaseContext context) { + assertNotNull(context); + expandPhaseFailure.incrementAndGet(); + } + + @Override + public void onExpandSearchPhaseEnd(SearchPhaseContext context, long tookTime) { + assertNotNull(context); + expandPhaseEnd.incrementAndGet(); + } + }; + + final List requestOperationListeners = new ArrayList<>(Arrays.asList(testListener, testListener)); + SearchRequestOperationsListener compositeListener = new SearchRequestOperationsListener.CompositeListener( + requestOperationListeners, + logger + ); + + SearchPhaseContext ctx = new MockSearchPhaseContext(1); + + compositeListener.onDFSPreQueryPhaseStart(ctx); + assertEquals(2, dfsPreQueryPhaseStart.get()); + assertEquals(0, dfsPreQueryPhaseFailure.get()); + assertEquals(0, dfsPreQueryPhaseEnd.get()); + assertEquals(0, canMatchPhaseStart.get()); + assertEquals(0, canMatchPhaseFailure.get()); + assertEquals(0, canMatchPhaseEnd.get()); + assertEquals(0, queryPhaseStart.get()); + assertEquals(0, queryPhaseFailure.get()); + assertEquals(0, queryPhaseEnd.get()); + assertEquals(0, fetchPhaseStart.get()); + assertEquals(0, fetchPhaseFailure.get()); + assertEquals(0, fetchPhaseEnd.get()); + assertEquals(0, expandPhaseStart.get()); + assertEquals(0, expandPhaseFailure.get()); + assertEquals(0, expandPhaseEnd.get()); + + compositeListener.onDFSPreQueryPhaseFailure(ctx); + assertEquals(2, dfsPreQueryPhaseStart.get()); + assertEquals(2, dfsPreQueryPhaseFailure.get()); + assertEquals(0, dfsPreQueryPhaseEnd.get()); + assertEquals(0, canMatchPhaseStart.get()); + assertEquals(0, canMatchPhaseFailure.get()); + assertEquals(0, canMatchPhaseEnd.get()); + assertEquals(0, queryPhaseStart.get()); + assertEquals(0, queryPhaseFailure.get()); + assertEquals(0, queryPhaseEnd.get()); + assertEquals(0, fetchPhaseStart.get()); + assertEquals(0, fetchPhaseFailure.get()); + assertEquals(0, fetchPhaseEnd.get()); + assertEquals(0, expandPhaseStart.get()); + assertEquals(0, expandPhaseFailure.get()); + assertEquals(0, expandPhaseEnd.get()); + + compositeListener.onDFSPreQueryPhaseEnd(ctx, timeInNanos.get()); + assertEquals(2, dfsPreQueryPhaseStart.get()); + assertEquals(2, dfsPreQueryPhaseFailure.get()); + assertEquals(2, dfsPreQueryPhaseEnd.get()); + assertEquals(0, canMatchPhaseStart.get()); + assertEquals(0, canMatchPhaseFailure.get()); + assertEquals(0, canMatchPhaseEnd.get()); + assertEquals(0, queryPhaseStart.get()); + assertEquals(0, queryPhaseFailure.get()); + assertEquals(0, queryPhaseEnd.get()); + assertEquals(0, fetchPhaseStart.get()); + assertEquals(0, fetchPhaseFailure.get()); + assertEquals(0, fetchPhaseEnd.get()); + assertEquals(0, expandPhaseStart.get()); + assertEquals(0, expandPhaseFailure.get()); + assertEquals(0, expandPhaseEnd.get()); + + compositeListener.onCanMatchPhaseStart(ctx); + assertEquals(2, dfsPreQueryPhaseStart.get()); + assertEquals(2, dfsPreQueryPhaseFailure.get()); + assertEquals(2, dfsPreQueryPhaseEnd.get()); + assertEquals(2, canMatchPhaseStart.get()); + assertEquals(0, canMatchPhaseFailure.get()); + assertEquals(0, canMatchPhaseEnd.get()); + assertEquals(0, queryPhaseStart.get()); + assertEquals(0, queryPhaseFailure.get()); + assertEquals(0, queryPhaseEnd.get()); + assertEquals(0, fetchPhaseStart.get()); + assertEquals(0, fetchPhaseFailure.get()); + assertEquals(0, fetchPhaseEnd.get()); + assertEquals(0, expandPhaseStart.get()); + assertEquals(0, expandPhaseFailure.get()); + assertEquals(0, expandPhaseEnd.get()); + + compositeListener.onCanMatchPhaseFailure(ctx); + assertEquals(2, dfsPreQueryPhaseStart.get()); + assertEquals(2, dfsPreQueryPhaseFailure.get()); + assertEquals(2, dfsPreQueryPhaseEnd.get()); + assertEquals(2, canMatchPhaseStart.get()); + assertEquals(2, canMatchPhaseFailure.get()); + assertEquals(0, canMatchPhaseEnd.get()); + assertEquals(0, queryPhaseStart.get()); + assertEquals(0, queryPhaseFailure.get()); + assertEquals(0, queryPhaseEnd.get()); + assertEquals(0, fetchPhaseStart.get()); + assertEquals(0, fetchPhaseFailure.get()); + assertEquals(0, fetchPhaseEnd.get()); + assertEquals(0, expandPhaseStart.get()); + assertEquals(0, expandPhaseFailure.get()); + assertEquals(0, expandPhaseEnd.get()); + + compositeListener.onCanMatchPhaseEnd(ctx, timeInNanos.get()); + assertEquals(2, dfsPreQueryPhaseStart.get()); + assertEquals(2, dfsPreQueryPhaseFailure.get()); + assertEquals(2, dfsPreQueryPhaseEnd.get()); + assertEquals(2, canMatchPhaseStart.get()); + assertEquals(2, canMatchPhaseFailure.get()); + assertEquals(2, canMatchPhaseEnd.get()); + assertEquals(0, queryPhaseStart.get()); + assertEquals(0, queryPhaseFailure.get()); + assertEquals(0, queryPhaseEnd.get()); + assertEquals(0, fetchPhaseStart.get()); + assertEquals(0, fetchPhaseFailure.get()); + assertEquals(0, fetchPhaseEnd.get()); + assertEquals(0, expandPhaseStart.get()); + assertEquals(0, expandPhaseFailure.get()); + assertEquals(0, expandPhaseEnd.get()); + + compositeListener.onQueryPhaseStart(ctx); + assertEquals(2, dfsPreQueryPhaseStart.get()); + assertEquals(2, dfsPreQueryPhaseFailure.get()); + assertEquals(2, dfsPreQueryPhaseEnd.get()); + assertEquals(2, canMatchPhaseStart.get()); + assertEquals(2, canMatchPhaseFailure.get()); + assertEquals(2, canMatchPhaseEnd.get()); + assertEquals(2, queryPhaseStart.get()); + assertEquals(0, queryPhaseFailure.get()); + assertEquals(0, queryPhaseEnd.get()); + assertEquals(0, fetchPhaseStart.get()); + assertEquals(0, fetchPhaseFailure.get()); + assertEquals(0, fetchPhaseEnd.get()); + assertEquals(0, expandPhaseStart.get()); + assertEquals(0, expandPhaseFailure.get()); + assertEquals(0, expandPhaseEnd.get()); + + compositeListener.onQueryPhaseFailure(ctx); + assertEquals(2, dfsPreQueryPhaseStart.get()); + assertEquals(2, dfsPreQueryPhaseFailure.get()); + assertEquals(2, dfsPreQueryPhaseEnd.get()); + assertEquals(2, canMatchPhaseStart.get()); + assertEquals(2, canMatchPhaseFailure.get()); + assertEquals(2, canMatchPhaseEnd.get()); + assertEquals(2, queryPhaseStart.get()); + assertEquals(2, queryPhaseFailure.get()); + assertEquals(0, queryPhaseEnd.get()); + assertEquals(0, fetchPhaseStart.get()); + assertEquals(0, fetchPhaseFailure.get()); + assertEquals(0, fetchPhaseEnd.get()); + assertEquals(0, expandPhaseStart.get()); + assertEquals(0, expandPhaseFailure.get()); + assertEquals(0, expandPhaseEnd.get()); + + compositeListener.onQueryPhaseEnd(ctx, timeInNanos.get()); + assertEquals(2, dfsPreQueryPhaseStart.get()); + assertEquals(2, dfsPreQueryPhaseFailure.get()); + assertEquals(2, dfsPreQueryPhaseEnd.get()); + assertEquals(2, canMatchPhaseStart.get()); + assertEquals(2, canMatchPhaseFailure.get()); + assertEquals(2, canMatchPhaseEnd.get()); + assertEquals(2, queryPhaseStart.get()); + assertEquals(2, queryPhaseFailure.get()); + assertEquals(2, queryPhaseEnd.get()); + assertEquals(0, fetchPhaseStart.get()); + assertEquals(0, fetchPhaseFailure.get()); + assertEquals(0, fetchPhaseEnd.get()); + assertEquals(0, expandPhaseStart.get()); + assertEquals(0, expandPhaseFailure.get()); + assertEquals(0, expandPhaseEnd.get()); + + compositeListener.onFetchPhaseStart(ctx); + assertEquals(2, dfsPreQueryPhaseStart.get()); + assertEquals(2, dfsPreQueryPhaseFailure.get()); + assertEquals(2, dfsPreQueryPhaseEnd.get()); + assertEquals(2, canMatchPhaseStart.get()); + assertEquals(2, canMatchPhaseFailure.get()); + assertEquals(2, canMatchPhaseEnd.get()); + assertEquals(2, queryPhaseStart.get()); + assertEquals(2, queryPhaseFailure.get()); + assertEquals(2, queryPhaseEnd.get()); + assertEquals(2, fetchPhaseStart.get()); + assertEquals(0, fetchPhaseFailure.get()); + assertEquals(0, fetchPhaseEnd.get()); + assertEquals(0, expandPhaseStart.get()); + assertEquals(0, expandPhaseFailure.get()); + assertEquals(0, expandPhaseEnd.get()); + + compositeListener.onFetchPhaseFailure(ctx); + assertEquals(2, dfsPreQueryPhaseStart.get()); + assertEquals(2, dfsPreQueryPhaseFailure.get()); + assertEquals(2, dfsPreQueryPhaseEnd.get()); + assertEquals(2, canMatchPhaseStart.get()); + assertEquals(2, canMatchPhaseFailure.get()); + assertEquals(2, canMatchPhaseEnd.get()); + assertEquals(2, queryPhaseStart.get()); + assertEquals(2, queryPhaseFailure.get()); + assertEquals(2, queryPhaseEnd.get()); + assertEquals(2, fetchPhaseStart.get()); + assertEquals(2, fetchPhaseFailure.get()); + assertEquals(0, fetchPhaseEnd.get()); + assertEquals(0, expandPhaseStart.get()); + assertEquals(0, expandPhaseFailure.get()); + assertEquals(0, expandPhaseEnd.get()); + + compositeListener.onFetchPhaseEnd(ctx, timeInNanos.get()); + assertEquals(2, dfsPreQueryPhaseStart.get()); + assertEquals(2, dfsPreQueryPhaseFailure.get()); + assertEquals(2, dfsPreQueryPhaseEnd.get()); + assertEquals(2, canMatchPhaseStart.get()); + assertEquals(2, canMatchPhaseFailure.get()); + assertEquals(2, canMatchPhaseEnd.get()); + assertEquals(2, queryPhaseStart.get()); + assertEquals(2, queryPhaseFailure.get()); + assertEquals(2, queryPhaseEnd.get()); + assertEquals(2, fetchPhaseStart.get()); + assertEquals(2, fetchPhaseFailure.get()); + assertEquals(2, fetchPhaseEnd.get()); + assertEquals(0, expandPhaseStart.get()); + assertEquals(0, expandPhaseFailure.get()); + assertEquals(0, expandPhaseEnd.get()); + + compositeListener.onExpandSearchPhaseStart(ctx); + assertEquals(2, dfsPreQueryPhaseStart.get()); + assertEquals(2, dfsPreQueryPhaseFailure.get()); + assertEquals(2, dfsPreQueryPhaseEnd.get()); + assertEquals(2, canMatchPhaseStart.get()); + assertEquals(2, canMatchPhaseFailure.get()); + assertEquals(2, canMatchPhaseEnd.get()); + assertEquals(2, queryPhaseStart.get()); + assertEquals(2, queryPhaseFailure.get()); + assertEquals(2, queryPhaseEnd.get()); + assertEquals(2, fetchPhaseStart.get()); + assertEquals(2, fetchPhaseFailure.get()); + assertEquals(2, fetchPhaseEnd.get()); + assertEquals(2, expandPhaseStart.get()); + assertEquals(0, expandPhaseFailure.get()); + assertEquals(0, expandPhaseEnd.get()); + + compositeListener.onExpandSearchPhaseFailure(ctx); + assertEquals(2, dfsPreQueryPhaseStart.get()); + assertEquals(2, dfsPreQueryPhaseFailure.get()); + assertEquals(2, dfsPreQueryPhaseEnd.get()); + assertEquals(2, canMatchPhaseStart.get()); + assertEquals(2, canMatchPhaseFailure.get()); + assertEquals(2, canMatchPhaseEnd.get()); + assertEquals(2, queryPhaseStart.get()); + assertEquals(2, queryPhaseFailure.get()); + assertEquals(2, queryPhaseEnd.get()); + assertEquals(2, fetchPhaseStart.get()); + assertEquals(2, fetchPhaseFailure.get()); + assertEquals(2, fetchPhaseEnd.get()); + assertEquals(2, expandPhaseStart.get()); + assertEquals(2, expandPhaseFailure.get()); + assertEquals(0, expandPhaseEnd.get()); + + compositeListener.onExpandSearchPhaseEnd(ctx, timeInNanos.get()); + assertEquals(2, dfsPreQueryPhaseStart.get()); + assertEquals(2, dfsPreQueryPhaseFailure.get()); + assertEquals(2, dfsPreQueryPhaseEnd.get()); + assertEquals(2, canMatchPhaseStart.get()); + assertEquals(2, canMatchPhaseFailure.get()); + assertEquals(2, canMatchPhaseEnd.get()); + assertEquals(2, queryPhaseStart.get()); + assertEquals(2, queryPhaseFailure.get()); + assertEquals(2, queryPhaseEnd.get()); + assertEquals(2, fetchPhaseStart.get()); + assertEquals(2, fetchPhaseFailure.get()); + assertEquals(2, fetchPhaseEnd.get()); + assertEquals(2, expandPhaseStart.get()); + assertEquals(2, expandPhaseFailure.get()); + assertEquals(2, expandPhaseEnd.get()); + } + +} diff --git a/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java b/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java index 7d2d8e38d066e..b3ac9dffe4736 100644 --- a/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java +++ b/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java @@ -32,6 +32,7 @@ package org.opensearch.index.search.stats; +import org.opensearch.action.search.SearchCoordinatorStats; import org.opensearch.index.search.stats.SearchStats.Stats; import org.opensearch.test.OpenSearchTestCase; @@ -63,6 +64,30 @@ public void testShardLevelSearchGroupStats() throws Exception { // adding again would then return wrong search stats (would return 4! instead of 3) searchStats1.add(searchStats2); assertStats(groupStats1.get("group1"), 3); + + long paramValue = 1; + + // Testing for coordinator stats + SearchCoordinatorStats testCoordinatorStats = new SearchCoordinatorStats(); + testCoordinatorStats.totalStats.dfsPreQueryMetric.inc(paramValue); + testCoordinatorStats.totalStats.dfsPreQueryCurrent.inc(paramValue); + testCoordinatorStats.totalStats.dfsPreQueryTotal.inc(paramValue); + testCoordinatorStats.totalStats.canMatchMetric.inc(paramValue); + testCoordinatorStats.totalStats.canMatchCurrent.inc(paramValue); + testCoordinatorStats.totalStats.canMatchTotal.inc(paramValue); + testCoordinatorStats.totalStats.queryMetric.inc(paramValue); + testCoordinatorStats.totalStats.queryCurrent.inc(paramValue); + testCoordinatorStats.totalStats.queryTotal.inc(paramValue); + testCoordinatorStats.totalStats.fetchMetric.inc(paramValue); + testCoordinatorStats.totalStats.fetchCurrent.inc(paramValue); + testCoordinatorStats.totalStats.fetchTotal.inc(paramValue); + testCoordinatorStats.totalStats.expandSearchMetric.inc(paramValue); + testCoordinatorStats.totalStats.expandSearchCurrent.inc(paramValue); + testCoordinatorStats.totalStats.expandSearchTotal.inc(paramValue); + + searchStats1.setSearchCoordinatorStats(testCoordinatorStats); + assertCoordinatorStats(searchStats1.getTotal(), paramValue); + } private static void assertStats(Stats stats, long equalTo) { @@ -83,4 +108,22 @@ private static void assertStats(Stats stats, long equalTo) { assertEquals(equalTo, stats.getSuggestCurrent()); } + private static void assertCoordinatorStats(Stats stats, long equalTo) { + assertEquals(equalTo, stats.getCoordinatorStatsLongHolder().dfsPreQueryMetric); + assertEquals(equalTo, stats.getCoordinatorStatsLongHolder().dfsPreQueryCurrent); + assertEquals(equalTo, stats.getCoordinatorStatsLongHolder().dfsPreQueryTotal); + assertEquals(equalTo, stats.getCoordinatorStatsLongHolder().canMatchMetric); + assertEquals(equalTo, stats.getCoordinatorStatsLongHolder().canMatchCurrent); + assertEquals(equalTo, stats.getCoordinatorStatsLongHolder().canMatchTotal); + assertEquals(equalTo, stats.getCoordinatorStatsLongHolder().queryMetric); + assertEquals(equalTo, stats.getCoordinatorStatsLongHolder().queryCurrent); + assertEquals(equalTo, stats.getCoordinatorStatsLongHolder().queryTotal); + assertEquals(equalTo, stats.getCoordinatorStatsLongHolder().fetchMetric); + assertEquals(equalTo, stats.getCoordinatorStatsLongHolder().fetchCurrent); + assertEquals(equalTo, stats.getCoordinatorStatsLongHolder().fetchTotal); + assertEquals(equalTo, stats.getCoordinatorStatsLongHolder().expandSearchMetric); + assertEquals(equalTo, stats.getCoordinatorStatsLongHolder().expandSearchCurrent); + assertEquals(equalTo, stats.getCoordinatorStatsLongHolder().expandSearchTotal); + } + } diff --git a/server/src/test/java/org/opensearch/indices/NodeIndicesStatsTests.java b/server/src/test/java/org/opensearch/indices/NodeIndicesStatsTests.java index 9be45d4e77940..473552a466c1a 100644 --- a/server/src/test/java/org/opensearch/indices/NodeIndicesStatsTests.java +++ b/server/src/test/java/org/opensearch/indices/NodeIndicesStatsTests.java @@ -43,7 +43,7 @@ public class NodeIndicesStatsTests extends OpenSearchTestCase { public void testInvalidLevel() { - final NodeIndicesStats stats = new NodeIndicesStats(null, Collections.emptyMap()); + final NodeIndicesStats stats = new NodeIndicesStats(null, Collections.emptyMap(), null); final String level = randomAlphaOfLength(16); final ToXContent.Params params = new ToXContent.MapParams(Collections.singletonMap("level", level)); final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> stats.toXContent(null, params)); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index c04e2821d7931..d4289bc3fb279 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -2062,7 +2062,8 @@ public void onFailure(final Exception e) { emptyMap(), new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService, threadPool), repositoriesServiceReference::get, - fileCacheCleaner + fileCacheCleaner, + null ); final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); snapshotShardsService = new SnapshotShardsService( @@ -2290,7 +2291,8 @@ public void onFailure(final Exception e) { namedWriteableRegistry, List.of(), client - ) + ), + null ) ); actions.put( From 80ab4c2e6ce81869d5519e4dfa2599b7ee98a02b Mon Sep 17 00:00:00 2001 From: sahil buddharaju Date: Fri, 30 Jun 2023 14:18:21 -0700 Subject: [PATCH 02/45] Changed version Signed-off-by: sahil buddharaju --- .../java/org/opensearch/index/search/stats/SearchStats.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java b/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java index 150368c629ef8..6ae6cc9906cb6 100644 --- a/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java +++ b/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java @@ -179,7 +179,7 @@ private Stats(StreamInput in) throws IOException { pitCurrent = in.readVLong(); } - if (in.getVersion().onOrAfter(Version.V_2_0_0)) { + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { this.coordinatorStatsLongHolder = new CoordinatorStatsLongHolder(); coordinatorStatsLongHolder.dfsPreQueryMetric = in.readVLong(); coordinatorStatsLongHolder.dfsPreQueryCurrent= in.readVLong(); @@ -353,7 +353,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(pitCurrent); } - if (out.getVersion().onOrAfter(Version.V_2_0_0)) { + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { out.writeVLong(coordinatorStatsLongHolder.dfsPreQueryMetric); out.writeVLong(coordinatorStatsLongHolder.dfsPreQueryCurrent); out.writeVLong(coordinatorStatsLongHolder.dfsPreQueryTotal); From e0fc5cb677f5418a5cb7d58262c21e2893cd8cb2 Mon Sep 17 00:00:00 2001 From: sahil buddharaju Date: Fri, 30 Jun 2023 14:52:12 -0700 Subject: [PATCH 03/45] Applied formatting Signed-off-by: sahil buddharaju --- .../action/search/AbstractSearchAsyncAction.java | 2 +- .../org/opensearch/common/metrics/CounterMetric.java | 1 - .../opensearch/index/search/stats/SearchStats.java | 12 ++++++------ .../search/SearchQueryThenFetchAsyncActionTests.java | 3 ++- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java index 595796319d2d8..2b9153b7333c3 100644 --- a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java @@ -448,7 +448,7 @@ public void onPhaseEnd(SearchPhaseContext searchPhaseContext) { searchRequestOperationsListener.onCanMatchPhaseEnd(searchPhaseContext, tookTimeInMillis); } else if (searchPhaseContext.getCurrentPhase() instanceof DfsQueryPhase) { searchRequestOperationsListener.onQueryPhaseEnd(searchPhaseContext, tookTimeInMillis); - }else if (searchPhaseContext.getCurrentPhase() instanceof SearchQueryThenFetchAsyncAction) { + } else if (searchPhaseContext.getCurrentPhase() instanceof SearchQueryThenFetchAsyncAction) { searchRequestOperationsListener.onQueryPhaseEnd(searchPhaseContext, tookTimeInMillis); } else if (searchPhaseContext.getCurrentPhase() instanceof FetchSearchPhase) { searchRequestOperationsListener.onFetchPhaseEnd(searchPhaseContext, tookTimeInMillis); diff --git a/server/src/main/java/org/opensearch/common/metrics/CounterMetric.java b/server/src/main/java/org/opensearch/common/metrics/CounterMetric.java index a20b30dd7d27b..cb181840406a5 100644 --- a/server/src/main/java/org/opensearch/common/metrics/CounterMetric.java +++ b/server/src/main/java/org/opensearch/common/metrics/CounterMetric.java @@ -63,5 +63,4 @@ public long count() { return counter.sum(); } - } diff --git a/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java b/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java index 6ae6cc9906cb6..9c67341fee4a7 100644 --- a/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java +++ b/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java @@ -33,7 +33,6 @@ package org.opensearch.index.search.stats; import org.opensearch.Version; -import org.opensearch.action.CoordinatorStats; import org.opensearch.action.search.SearchCoordinatorStats; import org.opensearch.common.Nullable; import org.opensearch.core.common.Strings; @@ -81,6 +80,7 @@ public static class CoordinatorStatsLongHolder { public long expandSearchCurrent; public long expandSearchTotal; } + public static class Stats implements Writeable, ToXContentFragment { private long queryCount; @@ -182,23 +182,23 @@ private Stats(StreamInput in) throws IOException { if (in.getVersion().onOrAfter(Version.V_3_0_0)) { this.coordinatorStatsLongHolder = new CoordinatorStatsLongHolder(); coordinatorStatsLongHolder.dfsPreQueryMetric = in.readVLong(); - coordinatorStatsLongHolder.dfsPreQueryCurrent= in.readVLong(); + coordinatorStatsLongHolder.dfsPreQueryCurrent = in.readVLong(); coordinatorStatsLongHolder.dfsPreQueryTotal = in.readVLong(); coordinatorStatsLongHolder.canMatchMetric = in.readVLong(); - coordinatorStatsLongHolder.canMatchCurrent= in.readVLong(); + coordinatorStatsLongHolder.canMatchCurrent = in.readVLong(); coordinatorStatsLongHolder.canMatchTotal = in.readVLong(); coordinatorStatsLongHolder.queryMetric = in.readVLong(); - coordinatorStatsLongHolder.queryCurrent= in.readVLong(); + coordinatorStatsLongHolder.queryCurrent = in.readVLong(); coordinatorStatsLongHolder.queryTotal = in.readVLong(); coordinatorStatsLongHolder.fetchMetric = in.readVLong(); - coordinatorStatsLongHolder.fetchCurrent= in.readVLong(); + coordinatorStatsLongHolder.fetchCurrent = in.readVLong(); coordinatorStatsLongHolder.fetchTotal = in.readVLong(); coordinatorStatsLongHolder.expandSearchMetric = in.readVLong(); - coordinatorStatsLongHolder.expandSearchCurrent= in.readVLong(); + coordinatorStatsLongHolder.expandSearchCurrent = in.readVLong(); coordinatorStatsLongHolder.expandSearchTotal = in.readVLong(); } } diff --git a/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java index 9e8af654d70aa..668912b5bc718 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java @@ -62,7 +62,8 @@ import org.opensearch.test.InternalAggregationTestCase; import org.opensearch.transport.Transport; -import java.util.*; +import java.util.Collections; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; From e21f2b68f22e118500580fe4326db945be6b3faa Mon Sep 17 00:00:00 2001 From: sahil buddharaju Date: Wed, 5 Jul 2023 14:15:12 -0700 Subject: [PATCH 04/45] Fixed NodeIndicesStatsTests bug Signed-off-by: sahil buddharaju --- .../search/SearchWeightedRoutingIT.java | 16 ++++++++++++++-- .../indices/NodeIndicesStatsTests.java | 6 +++++- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java index 28a4db5ecaf9d..c5bef50e81d7b 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java @@ -56,8 +56,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.*; +import static org.hamcrest.Matchers.greaterThan; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.search.aggregations.AggregationBuilders.terms; @@ -180,12 +180,24 @@ public void testSearchWithWRRShardRouting() throws IOException { assertFalse(!hitNodes.contains(nodeId)); } nodeStats = client().admin().cluster().prepareNodesStats().execute().actionGet(); + int num = 0; + int coordNumber = 0; for (NodeStats stat : nodeStats.getNodes()) { SearchStats.Stats searchStats = stat.getIndices().getSearch().getTotal(); + if (searchStats.getCoordinatorStatsLongHolder().queryMetric > 0) { + assertThat(searchStats.getCoordinatorStatsLongHolder().queryTotal, greaterThan(0L)); + assertThat(searchStats.getCoordinatorStatsLongHolder().fetchMetric, greaterThan(0L)); + assertThat(searchStats.getCoordinatorStatsLongHolder().fetchTotal, greaterThan(0L)); + assertThat(searchStats.getCoordinatorStatsLongHolder().expandSearchTotal, greaterThan(0L)); + coordNumber += 1; + } Assert.assertTrue(searchStats.getQueryCount() > 0L); Assert.assertTrue(searchStats.getFetchCount() > 0L); + num++; } + assertThat(coordNumber, greaterThan(0)); + assertThat(num, greaterThan(0)); } private Map> setupCluster(int nodeCountPerAZ, Settings commonSettings) { diff --git a/server/src/test/java/org/opensearch/indices/NodeIndicesStatsTests.java b/server/src/test/java/org/opensearch/indices/NodeIndicesStatsTests.java index 473552a466c1a..77acfbf2255f4 100644 --- a/server/src/test/java/org/opensearch/indices/NodeIndicesStatsTests.java +++ b/server/src/test/java/org/opensearch/indices/NodeIndicesStatsTests.java @@ -32,6 +32,8 @@ package org.opensearch.indices; +import org.opensearch.action.CoordinatorStats; +import org.opensearch.action.admin.indices.stats.CommonStats; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.test.OpenSearchTestCase; @@ -43,7 +45,9 @@ public class NodeIndicesStatsTests extends OpenSearchTestCase { public void testInvalidLevel() { - final NodeIndicesStats stats = new NodeIndicesStats(null, Collections.emptyMap(), null); + CommonStats oldStats = new CommonStats(); + CoordinatorStats coordinatorStats = new CoordinatorStats(); + final NodeIndicesStats stats = new NodeIndicesStats(oldStats, Collections.emptyMap(), coordinatorStats); final String level = randomAlphaOfLength(16); final ToXContent.Params params = new ToXContent.MapParams(Collections.singletonMap("level", level)); final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> stats.toXContent(null, params)); From 4a12c18ee95f31d7e9eb8a17029c4448ba18c559 Mon Sep 17 00:00:00 2001 From: sahil <61558528+buddharajusahil@users.noreply.github.com> Date: Fri, 30 Jun 2023 14:56:38 -0700 Subject: [PATCH 05/45] Update Debug_OpenSearch.xml Signed-off-by: sahil <61558528+buddharajusahil@users.noreply.github.com> Signed-off-by: sahil buddharaju --- .idea/runConfigurations/Debug_OpenSearch.xml | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/.idea/runConfigurations/Debug_OpenSearch.xml b/.idea/runConfigurations/Debug_OpenSearch.xml index fddcf47728460..0d8bf59823acf 100644 --- a/.idea/runConfigurations/Debug_OpenSearch.xml +++ b/.idea/runConfigurations/Debug_OpenSearch.xml @@ -1,11 +1,11 @@ - - + + From 6efcae53b5e14d9d2d946894d26469cad7345c90 Mon Sep 17 00:00:00 2001 From: sahil buddharaju Date: Wed, 5 Jul 2023 16:01:26 -0700 Subject: [PATCH 06/45] Fixed wildcard imports Signed-off-by: sahil buddharaju --- .../java/org/opensearch/search/SearchWeightedRoutingIT.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java index c5bef50e81d7b..d55857ea6d9dc 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java @@ -56,8 +56,9 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.hamcrest.Matchers.*; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.search.aggregations.AggregationBuilders.terms; From 62e608fa5cb3e9062eb1969f6a957022960d209b Mon Sep 17 00:00:00 2001 From: sahil buddharaju Date: Thu, 6 Jul 2023 13:35:42 -0700 Subject: [PATCH 07/45] Added negative test UT's to AbstractSearchAsyncAction Signed-off-by: sahil buddharaju --- .../AbstractSearchAsyncActionTests.java | 68 ++++++++++++++++++- 1 file changed, 65 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java index 41e257d1b7840..06ec40549f6d1 100644 --- a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java @@ -333,6 +333,60 @@ public void testSendSearchResponseDisallowPartialFailures() { assertEquals(requestIds, releasedContexts); } + public void testOnPhaseListenersFailure() { + SearchCoordinatorStatsTesting searchCoordinatorStatsTesting = new SearchCoordinatorStatsTesting(); + SearchRequestOperationsListener testListener = createSearchRequestOperationsListener(searchCoordinatorStatsTesting); + final List requestOperationListeners = new ArrayList<>(Arrays.asList(testListener, testListener)); + + SearchQueryThenFetchAsyncAction action = createSearchQueryThenFetchAsyncAction(); + action.setSearchListenerList(requestOperationListeners); + action.setCurrentPhase(action); + action.onPhaseFailure(new SearchPhase("test") { + @Override + public void run() { + + } + }, "message", null); + assertEquals(2, searchCoordinatorStatsTesting.queryPhaseFailure.get()); + + SearchDfsQueryThenFetchAsyncAction searchDfsQueryThenFetchAsyncAction = createSearchDfsQueryThenFetchAsyncAction(); + searchDfsQueryThenFetchAsyncAction.setSearchListenerList(requestOperationListeners); + searchDfsQueryThenFetchAsyncAction.setCurrentPhase(searchDfsQueryThenFetchAsyncAction); + searchDfsQueryThenFetchAsyncAction.onPhaseFailure(new SearchPhase("test") { + @Override + public void run() { + + } + }, "message", null); + assertEquals(2, searchCoordinatorStatsTesting.dfsPreQueryPhaseFailure.get()); + + CanMatchPreFilterSearchPhase canMatchPreFilterSearchPhaseAction = createCanMatchPreFilterSearchPhase(); + canMatchPreFilterSearchPhaseAction.setSearchListenerList(requestOperationListeners); + canMatchPreFilterSearchPhaseAction.setCurrentPhase(canMatchPreFilterSearchPhaseAction); + + canMatchPreFilterSearchPhaseAction.onPhaseFailure(new SearchPhase("test") { + @Override + public void run() { + + } + }, "message", null); + assertEquals(2, searchCoordinatorStatsTesting.canMatchPhaseFailure.get()); + + FetchSearchPhase fetchPhase = createFetchSearchPhase(); + ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomInt()); + SearchShardIterator searchShardIterator = new SearchShardIterator(null, shardId, Collections.emptyList(), OriginalIndices.NONE); + searchShardIterator.resetAndSkip(); + action.skipShard(searchShardIterator); + action.executeNextPhase(action, fetchPhase); + action.onPhaseFailure(new SearchPhase("test") { + @Override + public void run() { + + } + }, "message", null); + assertEquals(2, searchCoordinatorStatsTesting.fetchPhaseFailure.get()); + } + public void testOnPhaseFailure() { SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(false); AtomicReference exception = new AtomicReference<>(); @@ -341,6 +395,7 @@ public void testOnPhaseFailure() { List> nodeLookups = new ArrayList<>(); ArraySearchPhaseResults phaseResults = phaseResults(requestIds, nodeLookups, 0); AbstractSearchAsyncAction action = createAction(searchRequest, phaseResults, listener, false, new AtomicLong()); + action.onPhaseFailure(new SearchPhase("test") { @Override public void run() { @@ -612,6 +667,9 @@ private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAct shardsIter.size(), exc -> {} ); + AtomicReference exception = new AtomicReference<>(); + ActionListener listener = ActionListener.wrap(response -> fail("onResponse should not be called"), exception::set); + return new SearchDfsQueryThenFetchAsyncAction( logger, null, @@ -623,7 +681,7 @@ private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAct executor, resultConsumer, searchRequest, - null, + listener, shardsIter, null, null, @@ -652,6 +710,8 @@ private SearchQueryThenFetchAsyncAction createSearchQueryThenFetchAsyncAction() shardsIter.size(), exc -> {} ); + AtomicReference exception = new AtomicReference<>(); + ActionListener listener = ActionListener.wrap(response -> fail("onResponse should not be called"), exception::set); return new SearchQueryThenFetchAsyncAction( logger, null, @@ -663,7 +723,7 @@ private SearchQueryThenFetchAsyncAction createSearchQueryThenFetchAsyncAction() executor, resultConsumer, searchRequest, - null, + listener, shardsIter, null, null, @@ -709,6 +769,8 @@ private CanMatchPreFilterSearchPhase createCanMatchPreFilterSearchPhase() { primaryNode, replicaNode ); + AtomicReference exception = new AtomicReference<>(); + ActionListener listener = ActionListener.wrap(response -> fail("onResponse should not be called"), exception::set); return new CanMatchPreFilterSearchPhase( logger, searchTransportService, @@ -718,7 +780,7 @@ private CanMatchPreFilterSearchPhase createCanMatchPreFilterSearchPhase() { Collections.emptyMap(), OpenSearchExecutors.newDirectExecutorService(), searchRequest, - null, + listener, shardsIter, timeProvider, ClusterState.EMPTY_STATE, From 261bfa36d42af1fb7259c48b9ff7bb1200979526 Mon Sep 17 00:00:00 2001 From: sahil buddharaju Date: Tue, 18 Jul 2023 11:11:01 -0700 Subject: [PATCH 08/45] Formatting Signed-off-by: sahil buddharaju --- .../search/AbstractSearchAsyncAction.java | 19 ++- .../search/CanMatchPreFilterSearchPhase.java | 6 +- .../SearchDfsQueryThenFetchAsyncAction.java | 6 +- .../SearchQueryThenFetchAsyncAction.java | 7 +- .../SearchRequestOperationsListener.java | 30 ----- .../action/search/TransportSearchAction.java | 14 ++- .../AbstractSearchAsyncActionTests.java | 114 +++++++++--------- .../CanMatchPreFilterSearchPhaseTests.java | 18 ++- .../action/search/SearchAsyncActionTests.java | 16 ++- .../search/SearchCoordinatorStatsTests.java | 35 ++++++ .../SearchQueryThenFetchAsyncActionTests.java | 3 +- 11 files changed, 144 insertions(+), 124 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java index 2b9153b7333c3..50a092ce94d53 100644 --- a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java @@ -140,7 +140,8 @@ abstract class AbstractSearchAsyncAction exten SearchTask task, SearchPhaseResults resultConsumer, int maxConcurrentRequestsPerNode, - SearchResponse.Clusters clusters + SearchResponse.Clusters clusters, + List searchListenersList ) { super(name); final List toSkipIterators = new ArrayList<>(); @@ -176,12 +177,10 @@ abstract class AbstractSearchAsyncAction exten this.indexRoutings = indexRoutings; this.results = resultConsumer; this.clusters = clusters; - - } - - public void setSearchListenerList(List searchListenersList) { - this.searchListenersList = searchListenersList; - this.searchRequestOperationsListener = new SearchRequestOperationsListener.CompositeListener(this.searchListenersList, logger); + if (searchListenersList != null) { + this.searchListenersList = searchListenersList; + this.searchRequestOperationsListener = new SearchRequestOperationsListener.CompositeListener(this.searchListenersList, logger); + } } @Override @@ -436,7 +435,7 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha } } - public void onPhaseEnd(SearchPhaseContext searchPhaseContext) { + private void onPhaseEnd(SearchPhaseContext searchPhaseContext) { if (searchRequestOperationsListener == null) { return; } @@ -457,7 +456,7 @@ public void onPhaseEnd(SearchPhaseContext searchPhaseContext) { } } - public void onPhaseStart(SearchPhase phase, SearchPhaseContext searchPhaseContext) { + private void onPhaseStart(SearchPhase phase, SearchPhaseContext searchPhaseContext) { setCurrentPhase(phase); phase.setStartTimeInNanos(System.nanoTime()); if (searchRequestOperationsListener == null) { @@ -663,7 +662,7 @@ public SearchPhase getCurrentPhase() { return currentPhase; } - public void setCurrentPhase(SearchPhase phase) { + private void setCurrentPhase(SearchPhase phase) { currentPhase = phase; } diff --git a/server/src/main/java/org/opensearch/action/search/CanMatchPreFilterSearchPhase.java b/server/src/main/java/org/opensearch/action/search/CanMatchPreFilterSearchPhase.java index c026c72f77f00..3eb55263b2a9c 100644 --- a/server/src/main/java/org/opensearch/action/search/CanMatchPreFilterSearchPhase.java +++ b/server/src/main/java/org/opensearch/action/search/CanMatchPreFilterSearchPhase.java @@ -90,7 +90,8 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction, SearchPhase> phaseFactory, - SearchResponse.Clusters clusters + SearchResponse.Clusters clusters, + List searchListenersList ) { // We set max concurrent shard requests to the number of shards so no throttling happens for can_match requests super( @@ -110,7 +111,8 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction searchListenersList ) { super( "dfs", @@ -95,7 +96,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction task, new ArraySearchPhaseResults<>(shardsIts.size()), request.getMaxConcurrentShardRequests(), - clusters + clusters, + searchListenersList ); this.queryPhaseResultConsumer = queryPhaseResultConsumer; this.searchPhaseController = searchPhaseController; diff --git a/server/src/main/java/org/opensearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/SearchQueryThenFetchAsyncAction.java index 1ead14aac6b51..a1814f762c149 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -45,6 +45,7 @@ import org.opensearch.search.query.QuerySearchResult; import org.opensearch.transport.Transport; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; @@ -81,7 +82,8 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction searchListenersList ) { super( "query", @@ -100,7 +102,8 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction listeners; private final Logger logger; private long canMatchPhaseStart; - private long canMatchPhaseEnd; - private long dfsPreQueryPhaseStart; - private long dfsPreQueryPhaseEnd; - private long queryPhaseStart; - private long queryPhaseEnd; - private long fetchPhaseStart; - private long fetchPhaseEnd; - private long expandSearchPhaseStart; - private long expandSearchPhaseEnd; - private long dfsPreQueryTotal; - private long canMatchTotal; - private long queryTotal; - private long fetchTotal; - private long expandSearchTotal; public CompositeListener(List listeners, Logger logger) { this.listeners = listeners; @@ -94,7 +79,6 @@ public CompositeListener(List listeners, Logger public void onDFSPreQueryPhaseStart(SearchPhaseContext context) { for (SearchRequestOperationsListener listener : listeners) { try { - dfsPreQueryPhaseStart = System.nanoTime(); listener.onDFSPreQueryPhaseStart(context); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("onPhaseStart listener [{}] failed", listener), e); @@ -106,8 +90,6 @@ public void onDFSPreQueryPhaseStart(SearchPhaseContext context) { public void onDFSPreQueryPhaseEnd(SearchPhaseContext context, long tookTime) { for (SearchRequestOperationsListener listener : listeners) { try { - dfsPreQueryPhaseEnd = System.nanoTime(); - dfsPreQueryTotal = TimeUnit.NANOSECONDS.toMillis(dfsPreQueryPhaseEnd - dfsPreQueryPhaseStart); listener.onDFSPreQueryPhaseEnd(context, tookTime); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("onPhaseEnd listener [{}] failed", listener), e); @@ -130,7 +112,6 @@ public void onDFSPreQueryPhaseFailure(SearchPhaseContext context) { public void onCanMatchPhaseStart(SearchPhaseContext context) { for (SearchRequestOperationsListener listener : listeners) { try { - canMatchPhaseStart = System.nanoTime(); listener.onCanMatchPhaseStart(context); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("onPhaseStart listener [{}] failed", listener), e); @@ -142,8 +123,6 @@ public void onCanMatchPhaseStart(SearchPhaseContext context) { public void onCanMatchPhaseEnd(SearchPhaseContext context, long tookTime) { for (SearchRequestOperationsListener listener : listeners) { try { - canMatchPhaseEnd = System.nanoTime(); - canMatchTotal = TimeUnit.NANOSECONDS.toMillis(canMatchPhaseEnd - canMatchPhaseStart); listener.onCanMatchPhaseEnd(context, tookTime); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("onPhaseEnd listener [{}] failed", listener), e); @@ -166,7 +145,6 @@ public void onCanMatchPhaseFailure(SearchPhaseContext context) { public void onQueryPhaseStart(SearchPhaseContext context) { for (SearchRequestOperationsListener listener : listeners) { try { - queryPhaseStart = System.nanoTime(); listener.onQueryPhaseStart(context); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("onPhaseStart listener [{}] failed", listener), e); @@ -178,8 +156,6 @@ public void onQueryPhaseStart(SearchPhaseContext context) { public void onQueryPhaseEnd(SearchPhaseContext context, long tookTime) { for (SearchRequestOperationsListener listener : listeners) { try { - queryPhaseEnd = System.nanoTime(); - queryTotal = TimeUnit.NANOSECONDS.toMillis(queryPhaseEnd - queryPhaseStart); listener.onQueryPhaseEnd(context, tookTime); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("onPhaseEnd listener [{}] failed", listener), e); @@ -202,7 +178,6 @@ public void onQueryPhaseFailure(SearchPhaseContext context) { public void onFetchPhaseStart(SearchPhaseContext context) { for (SearchRequestOperationsListener listener : listeners) { try { - fetchPhaseStart = System.nanoTime(); listener.onFetchPhaseStart(context); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("onFetchStart listener [{}] failed", listener), e); @@ -214,8 +189,6 @@ public void onFetchPhaseStart(SearchPhaseContext context) { public void onFetchPhaseEnd(SearchPhaseContext context, long tookTime) { for (SearchRequestOperationsListener listener : listeners) { try { - fetchPhaseEnd = System.nanoTime(); - fetchTotal = TimeUnit.NANOSECONDS.toMillis(fetchPhaseEnd - fetchPhaseStart); listener.onFetchPhaseEnd(context, tookTime); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("onFetchEnd listener [{}] failed", listener), e); @@ -238,7 +211,6 @@ public void onFetchPhaseFailure(SearchPhaseContext context) { public void onExpandSearchPhaseStart(SearchPhaseContext context) { for (SearchRequestOperationsListener listener : listeners) { try { - expandSearchPhaseStart = System.nanoTime(); listener.onExpandSearchPhaseStart(context); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("onExpandSearchStart listener [{}] failed", listener), e); @@ -250,8 +222,6 @@ public void onExpandSearchPhaseStart(SearchPhaseContext context) { public void onExpandSearchPhaseEnd(SearchPhaseContext context, long tookTime) { for (SearchRequestOperationsListener listener : listeners) { try { - expandSearchPhaseEnd = System.nanoTime(); - expandSearchTotal = TimeUnit.NANOSECONDS.toMillis(expandSearchPhaseEnd - expandSearchPhaseStart); listener.onExpandSearchPhaseEnd(context, tookTime); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("onExpandSearchEnd listener [{}] failed", listener), e); diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 907c9b4cabc90..b229e7fcd25e3 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -350,7 +350,8 @@ public AbstractSearchAsyncAction asyncSearchAction( task, new ArraySearchPhaseResults<>(shardsIts.size()), searchRequest.getMaxConcurrentShardRequests(), - clusters + clusters, + searchListenersList ) { @Override protected void executePhaseOnShard( @@ -378,7 +379,6 @@ boolean buildPointInTimeFromSearchResults() { return includeSearchContext; } }; - returnAbstractSearchAsyncAction.setSearchListenerList(searchListenersList); return returnAbstractSearchAsyncAction; } }, listener); @@ -1168,7 +1168,8 @@ public void run() { } }; }, - clusters + clusters, + searchListenersList ); } else { final QueryPhaseResultConsumer queryResultConsumer = searchPhaseController.newSearchPhaseResults( @@ -1198,7 +1199,8 @@ public void run() { timeProvider, clusterState, task, - clusters + clusters, + searchListenersList ); break; case QUERY_THEN_FETCH: @@ -1218,13 +1220,13 @@ public void run() { timeProvider, clusterState, task, - clusters + clusters, + searchListenersList ); break; default: throw new IllegalStateException("Unknown search type: [" + searchRequest.searchType() + "]"); } - searchAsyncAction.setSearchListenerList(searchListenersList); return searchAsyncAction; } } diff --git a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java index 06ec40549f6d1..b5309437e9d53 100644 --- a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java @@ -91,6 +91,7 @@ import java.util.stream.IntStream; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; @@ -181,7 +182,8 @@ private AbstractSearchAsyncAction createAction( null, results, request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ) { @Override protected SearchPhase getNextPhase(final SearchPhaseResults results, SearchPhaseContext context) { @@ -334,44 +336,29 @@ public void testSendSearchResponseDisallowPartialFailures() { } public void testOnPhaseListenersFailure() { - SearchCoordinatorStatsTesting searchCoordinatorStatsTesting = new SearchCoordinatorStatsTesting(); - SearchRequestOperationsListener testListener = createSearchRequestOperationsListener(searchCoordinatorStatsTesting); - final List requestOperationListeners = new ArrayList<>(Arrays.asList(testListener, testListener)); - - SearchQueryThenFetchAsyncAction action = createSearchQueryThenFetchAsyncAction(); - action.setSearchListenerList(requestOperationListeners); - action.setCurrentPhase(action); + SearchCoordinatorStatsTesting searchCoordinatorMockStats = new SearchCoordinatorStatsTesting(); + SearchRequestOperationsListener testListener = createSearchRequestOperationsListener(searchCoordinatorMockStats); + final List requestOperationListeners = new ArrayList<>(List.of(testListener)); + SearchQueryThenFetchAsyncAction action = createSearchQueryThenFetchAsyncAction(requestOperationListeners); + action.start(); action.onPhaseFailure(new SearchPhase("test") { @Override public void run() { } }, "message", null); - assertEquals(2, searchCoordinatorStatsTesting.queryPhaseFailure.get()); + assertEquals(1, searchCoordinatorMockStats.queryPhaseFailure.get()); - SearchDfsQueryThenFetchAsyncAction searchDfsQueryThenFetchAsyncAction = createSearchDfsQueryThenFetchAsyncAction(); - searchDfsQueryThenFetchAsyncAction.setSearchListenerList(requestOperationListeners); - searchDfsQueryThenFetchAsyncAction.setCurrentPhase(searchDfsQueryThenFetchAsyncAction); + SearchDfsQueryThenFetchAsyncAction searchDfsQueryThenFetchAsyncAction = createSearchDfsQueryThenFetchAsyncAction(requestOperationListeners); + searchDfsQueryThenFetchAsyncAction.start(); searchDfsQueryThenFetchAsyncAction.onPhaseFailure(new SearchPhase("test") { @Override public void run() { } }, "message", null); - assertEquals(2, searchCoordinatorStatsTesting.dfsPreQueryPhaseFailure.get()); - - CanMatchPreFilterSearchPhase canMatchPreFilterSearchPhaseAction = createCanMatchPreFilterSearchPhase(); - canMatchPreFilterSearchPhaseAction.setSearchListenerList(requestOperationListeners); - canMatchPreFilterSearchPhaseAction.setCurrentPhase(canMatchPreFilterSearchPhaseAction); - - canMatchPreFilterSearchPhaseAction.onPhaseFailure(new SearchPhase("test") { - @Override - public void run() { - - } - }, "message", null); - assertEquals(2, searchCoordinatorStatsTesting.canMatchPhaseFailure.get()); - + assertEquals(1, searchCoordinatorMockStats.dfsPreQueryPhaseFailure.get()); + FetchSearchPhase fetchPhase = createFetchSearchPhase(); ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomInt()); SearchShardIterator searchShardIterator = new SearchShardIterator(null, shardId, Collections.emptyList(), OriginalIndices.NONE); @@ -384,7 +371,7 @@ public void run() { } }, "message", null); - assertEquals(2, searchCoordinatorStatsTesting.fetchPhaseFailure.get()); + assertEquals(1, searchCoordinatorMockStats.fetchPhaseFailure.get()); } public void testOnPhaseFailure() { @@ -603,22 +590,20 @@ public void onFailure(Exception e) { assertThat(searchResponse.getSuccessfulShards(), equalTo(shards.length)); } - public void testSearchRequestListeners() { - SearchCoordinatorStatsTesting searchCoordinatorStatsTesting = new SearchCoordinatorStatsTesting(); - SearchRequestOperationsListener testListener = createSearchRequestOperationsListener(searchCoordinatorStatsTesting); - final List requestOperationListeners = new ArrayList<>(Arrays.asList(testListener, testListener)); + public void testSearchRequestListeners() throws InterruptedException { + SearchCoordinatorStats testListener = new SearchCoordinatorStats(); + final List requestOperationListeners = new ArrayList<>(List.of(testListener)); - SearchQueryThenFetchAsyncAction action = createSearchQueryThenFetchAsyncAction(); - action.setSearchListenerList(requestOperationListeners); + SearchQueryThenFetchAsyncAction action = createSearchQueryThenFetchAsyncAction(requestOperationListeners); - SearchDfsQueryThenFetchAsyncAction searchDfsQueryThenFetchAsyncAction = createSearchDfsQueryThenFetchAsyncAction(); - searchDfsQueryThenFetchAsyncAction.setSearchListenerList(requestOperationListeners); + SearchDfsQueryThenFetchAsyncAction searchDfsQueryThenFetchAsyncAction = createSearchDfsQueryThenFetchAsyncAction(requestOperationListeners); - CanMatchPreFilterSearchPhase canMatchPreFilterSearchPhaseAction = createCanMatchPreFilterSearchPhase(); - canMatchPreFilterSearchPhaseAction.setSearchListenerList(requestOperationListeners); + CanMatchPreFilterSearchPhase canMatchPreFilterSearchPhaseAction = createCanMatchPreFilterSearchPhase(requestOperationListeners); + long delay = (int)Math.floor(Math.random() * (5 - 1 + 1) + 1); action.start(); - assertEquals(2, searchCoordinatorStatsTesting.queryPhaseStart.get()); + assertEquals(1, testListener.totalStats.queryCurrent.count()); + TimeUnit.SECONDS.sleep(delay); FetchSearchPhase fetchPhase = createFetchSearchPhase(); ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomInt()); @@ -627,27 +612,36 @@ public void testSearchRequestListeners() { action.skipShard(searchShardIterator); action.executeNextPhase(action, fetchPhase); - assertEquals(2, searchCoordinatorStatsTesting.queryPhaseEnd.get()); + assertThat(testListener.totalStats.queryMetric.sum(), greaterThan(1000 * delay)); + assertEquals(testListener.totalStats.queryTotal.count(), 1); - searchDfsQueryThenFetchAsyncAction.start(); - searchDfsQueryThenFetchAsyncAction.skipShard(searchShardIterator); - searchDfsQueryThenFetchAsyncAction.executeNextPhase(searchDfsQueryThenFetchAsyncAction, fetchPhase); - - canMatchPreFilterSearchPhaseAction.start(); - - assertEquals(2, searchCoordinatorStatsTesting.dfsPreQueryPhaseStart.get()); - assertEquals(2, searchCoordinatorStatsTesting.dfsPreQueryPhaseEnd.get()); - assertEquals(2, searchCoordinatorStatsTesting.canMatchPhaseStart.get()); - assertEquals(4, searchCoordinatorStatsTesting.fetchPhaseStart.get()); + assertEquals(1, testListener.totalStats.fetchCurrent.count()); + TimeUnit.SECONDS.sleep(delay); ExpandSearchPhase expandPhase = createExpandSearchPhase(); action.executeNextPhase(fetchPhase, expandPhase); - assertEquals(2, searchCoordinatorStatsTesting.expandPhaseStart.get()); - action.sendSearchResponse(InternalSearchResponse.empty(), null); + TimeUnit.SECONDS.sleep(delay); + assertThat(testListener.totalStats.fetchMetric.sum(), greaterThan(1000 * delay)); + assertEquals(testListener.totalStats.fetchTotal.count(), 1); + assertEquals(1, testListener.totalStats.expandSearchCurrent.count()); + + action.executeNextPhase(expandPhase, fetchPhase); + assertThat(testListener.totalStats.expandSearchMetric.sum(), greaterThan(1000 * delay)); + assertEquals(testListener.totalStats.expandSearchTotal.count(), 1); + + searchDfsQueryThenFetchAsyncAction.start(); + assertEquals(1, testListener.totalStats.dfsPreQueryCurrent.count()); + TimeUnit.SECONDS.sleep(delay); + + searchDfsQueryThenFetchAsyncAction.skipShard(searchShardIterator); + searchDfsQueryThenFetchAsyncAction.executeNextPhase(searchDfsQueryThenFetchAsyncAction, fetchPhase); + + assertThat(testListener.totalStats.dfsPreQueryMetric.sum(), greaterThan(1000 * delay)); + assertEquals(testListener.totalStats.dfsPreQueryTotal.count(), 1); } - private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAction() { + private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAction(List searchRequestOperationsListeners) { SearchPhaseController controller = new SearchPhaseController( writableRegistry(), r -> InternalAggregationTestCase.emptyReduceContextBuilder() @@ -686,11 +680,12 @@ private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAct null, null, task, - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + searchRequestOperationsListeners ); } - private SearchQueryThenFetchAsyncAction createSearchQueryThenFetchAsyncAction() { + private SearchQueryThenFetchAsyncAction createSearchQueryThenFetchAsyncAction(List searchRequestOperationsListeners) { SearchPhaseController controller = new SearchPhaseController( writableRegistry(), r -> InternalAggregationTestCase.emptyReduceContextBuilder() @@ -728,7 +723,8 @@ private SearchQueryThenFetchAsyncAction createSearchQueryThenFetchAsyncAction() null, null, task, - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + searchRequestOperationsListeners ) { @Override ShardSearchFailure[] buildShardFailures() { @@ -737,13 +733,12 @@ ShardSearchFailure[] buildShardFailures() { @Override public void sendSearchResponse(InternalSearchResponse internalSearchResponse, AtomicArray queryResults) { - onPhaseEnd(this); - setCurrentPhase(null); + start(); } }; } - private CanMatchPreFilterSearchPhase createCanMatchPreFilterSearchPhase() { + private CanMatchPreFilterSearchPhase createCanMatchPreFilterSearchPhase(List searchRequestOperationsListeners) { final TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider( 0, System.nanoTime(), @@ -786,7 +781,8 @@ private CanMatchPreFilterSearchPhase createCanMatchPreFilterSearchPhase() { ClusterState.EMPTY_STATE, null, null, - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + searchRequestOperationsListeners ); } diff --git a/server/src/test/java/org/opensearch/action/search/CanMatchPreFilterSearchPhaseTests.java b/server/src/test/java/org/opensearch/action/search/CanMatchPreFilterSearchPhaseTests.java index 0561f81f96ce6..ab61f77550e9f 100644 --- a/server/src/test/java/org/opensearch/action/search/CanMatchPreFilterSearchPhaseTests.java +++ b/server/src/test/java/org/opensearch/action/search/CanMatchPreFilterSearchPhaseTests.java @@ -136,7 +136,8 @@ public void run() throws IOException { latch.countDown(); } }, - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ); canMatchPhase.start(); @@ -227,7 +228,8 @@ public void run() throws IOException { latch.countDown(); } }, - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ); canMatchPhase.start(); @@ -317,7 +319,8 @@ public void sendCanMatch( null, new ArraySearchPhaseResults<>(iter.size()), randomIntBetween(1, 32), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ) { @Override @@ -344,7 +347,8 @@ protected void executePhaseOnShard( } } }, - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ); canMatchPhase.start(); @@ -428,7 +432,8 @@ public void run() { latch.countDown(); } }, - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ); canMatchPhase.start(); @@ -527,7 +532,8 @@ public void run() { latch.countDown(); } }, - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ); canMatchPhase.start(); diff --git a/server/src/test/java/org/opensearch/action/search/SearchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/SearchAsyncActionTests.java index 9ee3b11f05785..af8433e8ca721 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchAsyncActionTests.java @@ -135,7 +135,8 @@ public void testSkipSearchShards() throws InterruptedException { null, new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ) { @Override @@ -253,7 +254,8 @@ public void testLimitConcurrentShardRequests() throws InterruptedException { null, new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ) { @Override @@ -370,7 +372,8 @@ public void sendFreeContext(Transport.Connection connection, ShardSearchContextI null, new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ) { TestSearchResponse response = new TestSearchResponse(); @@ -492,7 +495,8 @@ public void sendFreeContext(Transport.Connection connection, ShardSearchContextI null, new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ) { TestSearchResponse response = new TestSearchResponse(); @@ -605,9 +609,9 @@ public void testAllowPartialResults() throws InterruptedException { null, new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ) { - @Override protected void executePhaseOnShard( SearchShardIterator shardIt, diff --git a/server/src/test/java/org/opensearch/action/search/SearchCoordinatorStatsTests.java b/server/src/test/java/org/opensearch/action/search/SearchCoordinatorStatsTests.java index 17e93f5ce16cf..f3f5b095a9dc1 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchCoordinatorStatsTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchCoordinatorStatsTests.java @@ -14,6 +14,41 @@ import java.util.concurrent.Phaser; public class SearchCoordinatorStatsTests extends OpenSearchTestCase { + public void testSearchCoordinatorPhaseFailure() { + SearchCoordinatorStats testCoordinatorStats = new SearchCoordinatorStats(); + SearchPhaseContext ctx = new MockSearchPhaseContext(1); + + testCoordinatorStats.onDFSPreQueryPhaseStart(ctx); + assertEquals(1, testCoordinatorStats.getDFSPreQueryCurrent()); + + testCoordinatorStats.onDFSPreQueryPhaseFailure(ctx); + assertEquals(0, testCoordinatorStats.getDFSPreQueryCurrent()); + + testCoordinatorStats.onCanMatchPhaseStart(ctx); + assertEquals(1, testCoordinatorStats.getCanMatchCurrent()); + + testCoordinatorStats.onCanMatchPhaseFailure(ctx); + assertEquals(0, testCoordinatorStats.getCanMatchCurrent()); + + testCoordinatorStats.onQueryPhaseStart(ctx); + assertEquals(1, testCoordinatorStats.getQueryCurrent()); + + testCoordinatorStats.onQueryPhaseFailure(ctx); + assertEquals(0, testCoordinatorStats.getQueryCurrent()); + + testCoordinatorStats.onFetchPhaseStart(ctx); + assertEquals(1, testCoordinatorStats.getFetchCurrent()); + + testCoordinatorStats.onFetchPhaseFailure(ctx); + assertEquals(0, testCoordinatorStats.getFetchCurrent()); + + testCoordinatorStats.onExpandSearchPhaseStart(ctx); + assertEquals(1, testCoordinatorStats.getExpandSearchCurrent()); + + testCoordinatorStats.onExpandSearchPhaseFailure(ctx); + assertEquals(0, testCoordinatorStats.getExpandSearchCurrent()); + } + public void testSearchCoordinatorStats() { SearchCoordinatorStats testCoordinatorStats = new SearchCoordinatorStats(); diff --git a/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java index 668912b5bc718..bb15f2a5edab5 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java @@ -214,7 +214,8 @@ public void sendExecuteQuery( timeProvider, null, task, - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ) { @Override protected SearchPhase getNextPhase(SearchPhaseResults results, SearchPhaseContext context) { From bc0670dbeb9cb464ad48c6cefe7addda49ef8a71 Mon Sep 17 00:00:00 2001 From: sahil buddharaju Date: Tue, 18 Jul 2023 11:24:55 -0700 Subject: [PATCH 09/45] Added more revisions Signed-off-by: sahil buddharaju --- .idea/runConfigurations/Debug_OpenSearch.xml | 2 +- .../action/search/SearchCoordinatorStats.java | 36 +++++++++---------- .../SearchRequestOperationsListener.java | 16 --------- .../AbstractSearchAsyncActionTests.java | 24 +++++++++---- 4 files changed, 34 insertions(+), 44 deletions(-) diff --git a/.idea/runConfigurations/Debug_OpenSearch.xml b/.idea/runConfigurations/Debug_OpenSearch.xml index 0d8bf59823acf..2e167812615e1 100644 --- a/.idea/runConfigurations/Debug_OpenSearch.xml +++ b/.idea/runConfigurations/Debug_OpenSearch.xml @@ -8,4 +8,4 @@