Skip to content

Commit

Permalink
Search latency tracking - Coordinator node (#8386) (#10279)
Browse files Browse the repository at this point in the history
  • Loading branch information
sgup432 authored Sep 29, 2023
1 parent 94173e3 commit befcde9
Show file tree
Hide file tree
Showing 31 changed files with 1,052 additions and 49 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- Add coordinator level stats for search latency ([#8386](https://github.com/opensearch-project/OpenSearch/issues/8386))
- Add metrics for thread_pool task wait time ([#9681](https://github.com/opensearch-project/OpenSearch/pull/9681))
- Async blob read support for S3 plugin ([#9694](https://github.com/opensearch-project/OpenSearch/pull/9694))
- [Telemetry-Otel] Added support for OtlpGrpcSpanExporter exporter ([#9666](https://github.com/opensearch-project/OpenSearch/pull/9666))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.action.get.MultiGetRequest;
import org.opensearch.action.get.MultiGetResponse;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.search.SearchPhaseName;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
Expand Down Expand Up @@ -56,9 +57,11 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.action.search.TransportSearchAction.SEARCH_REQUEST_STATS_ENABLED_KEY;
import static org.opensearch.search.aggregations.AggregationBuilders.terms;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, minNumDataNodes = 3)
Expand All @@ -74,6 +77,7 @@ public void testSearchWithWRRShardRouting() throws IOException {
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone" + ".values", "a,b,c")
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone")
.put("cluster.routing.weighted.fail_open", false)
.put(SEARCH_REQUEST_STATS_ENABLED_KEY, true)
.build();

logger.info("--> starting 6 nodes on different zones");
Expand Down Expand Up @@ -180,12 +184,39 @@ public void testSearchWithWRRShardRouting() throws IOException {
assertFalse(!hitNodes.contains(nodeId));
}
nodeStats = client().admin().cluster().prepareNodesStats().execute().actionGet();
int num = 0;
int coordNumber = 0;

for (NodeStats stat : nodeStats.getNodes()) {
SearchStats.Stats searchStats = stat.getIndices().getSearch().getTotal();
if (searchStats.getRequestStatsLongHolder()
.getRequestStatsHolder()
.get(SearchPhaseName.QUERY.getName())
.getTimeInMillis() > 0) {
assertThat(
searchStats.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.QUERY.getName()).getTotal(),
greaterThan(0L)
);
assertThat(
searchStats.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTimeInMillis(),
greaterThan(0L)
);
assertThat(
searchStats.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTotal(),
greaterThan(0L)
);
assertThat(
searchStats.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.EXPAND.getName()).getTotal(),
greaterThan(0L)
);
coordNumber += 1;
}
Assert.assertTrue(searchStats.getQueryCount() > 0L);
Assert.assertTrue(searchStats.getFetchCount() > 0L);
num++;
}
assertThat(coordNumber, greaterThan(0));
assertThat(num, greaterThan(0));
}

private Map<String, List<String>> setupCluster(int nodeCountPerAZ, Settings commonSettings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.search.SearchPhaseName;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.routing.GroupShardsIterator;
Expand All @@ -63,6 +64,7 @@
import java.util.Set;
import java.util.function.Function;

import static org.opensearch.action.search.TransportSearchAction.SEARCH_REQUEST_STATS_ENABLED_KEY;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
Expand All @@ -78,7 +80,7 @@
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

@OpenSearchIntegTestCase.ClusterScope(minNumDataNodes = 2)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, minNumDataNodes = 2)
public class SearchStatsIT extends ParameterizedOpenSearchIntegTestCase {

public SearchStatsIT(Settings dynamicSettings) {
Expand Down Expand Up @@ -126,6 +128,11 @@ public void testSimpleStats() throws Exception {
assertThat(numNodes, greaterThanOrEqualTo(2));
final int shardsIdx1 = randomIntBetween(1, 10); // we make sure each node gets at least a single shard...
final int shardsIdx2 = Math.max(numNodes - shardsIdx1, randomIntBetween(1, 10));
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(SEARCH_REQUEST_STATS_ENABLED_KEY, true).build())
.get();
assertThat(numNodes, lessThanOrEqualTo(shardsIdx1 + shardsIdx2));
assertAcked(
prepareCreate("test1").setSettings(
Expand Down Expand Up @@ -188,20 +195,40 @@ public void testSimpleStats() throws Exception {

Set<String> nodeIdsWithIndex = nodeIdsWithIndex("test1", "test2");
int num = 0;
int numOfCoordinators = 0;

for (NodeStats stat : nodeStats.getNodes()) {
Stats total = stat.getIndices().getSearch().getTotal();
if (total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.QUERY.getName()).getTimeInMillis() > 0) {
assertThat(
total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTimeInMillis(),
greaterThan(0L)
);
assertEquals(
iters,
total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTotal()
);
assertEquals(
iters,
total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.EXPAND.getName()).getTotal()
);
assertEquals(
iters,
total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTotal()
);
numOfCoordinators += 1;
}
if (nodeIdsWithIndex.contains(stat.getNode().getId())) {
assertThat(total.getQueryCount(), greaterThan(0L));
assertThat(total.getQueryTimeInMillis(), greaterThan(0L));
num++;
} else {
assertThat(total.getQueryCount(), equalTo(0L));
assertThat(total.getQueryCount(), greaterThanOrEqualTo(0L));
assertThat(total.getQueryTimeInMillis(), equalTo(0L));
}
}

assertThat(numOfCoordinators, greaterThan(0));
assertThat(num, greaterThan(0));

}

private Set<String> nodeIdsWithIndex(String... indices) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -107,7 +108,6 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final AtomicInteger skippedOps = new AtomicInteger();
private final TransportSearchAction.SearchTimeProvider timeProvider;
private final SearchResponse.Clusters clusters;

protected final GroupShardsIterator<SearchShardIterator> toSkipShardsIts;
protected final GroupShardsIterator<SearchShardIterator> shardsIts;
private final int expectedTotalOps;
Expand All @@ -116,8 +116,12 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final Map<String, PendingExecutions> pendingExecutionsPerNode = new ConcurrentHashMap<>();
private final boolean throttleConcurrentRequests;

private SearchPhase currentPhase;

private final List<Releasable> releasables = new ArrayList<>();

private Optional<SearchRequestOperationsListener> searchRequestOperationsListener;

AbstractSearchAsyncAction(
String name,
Logger logger,
Expand All @@ -135,7 +139,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
SearchTask task,
SearchPhaseResults<Result> resultConsumer,
int maxConcurrentRequestsPerNode,
SearchResponse.Clusters clusters
SearchResponse.Clusters clusters,
SearchRequestOperationsListener searchRequestOperationsListener
) {
super(name);
final List<SearchShardIterator> toSkipIterators = new ArrayList<>();
Expand Down Expand Up @@ -171,6 +176,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
this.indexRoutings = indexRoutings;
this.results = resultConsumer;
this.clusters = clusters;
this.searchRequestOperationsListener = Optional.ofNullable(searchRequestOperationsListener);
}

@Override
Expand Down Expand Up @@ -371,6 +377,7 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha
: OpenSearchException.guessRootCauses(shardSearchFailures[0].getCause())[0];
logger.debug(() -> new ParameterizedMessage("All shards failed for phase: [{}]", getName()), cause);
onPhaseFailure(currentPhase, "all shards failed", cause);

} else {
Boolean allowPartialResults = request.allowPartialSearchResults();
assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
Expand Down Expand Up @@ -419,13 +426,24 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha
clusterState.version()
);
}
onPhaseEnd();
executePhase(nextPhase);
}
}

private void onPhaseEnd() {
this.searchRequestOperationsListener.ifPresent(searchRequestOperations -> { searchRequestOperations.onPhaseEnd(this); });
}

private void onPhaseStart(SearchPhase phase) {
setCurrentPhase(phase);
this.searchRequestOperationsListener.ifPresent(searchRequestOperations -> { searchRequestOperations.onPhaseStart(this); });
}

private void executePhase(SearchPhase phase) {
try {
phase.run();
onPhaseStart(phase);
phase.recordAndRun();
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug(new ParameterizedMessage("Failed to execute [{}] while moving to [{}] phase", request, phase.getName()), e);
Expand Down Expand Up @@ -603,6 +621,14 @@ private void successfulShardExecution(SearchShardIterator shardsIt) {
}
}

public SearchPhase getCurrentPhase() {
return currentPhase;
}

private void setCurrentPhase(SearchPhase phase) {
currentPhase = phase;
}

@Override
public final int getNumShards() {
return results.getNumShards();
Expand Down Expand Up @@ -670,10 +696,13 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At
}
listener.onResponse(buildSearchResponse(internalSearchResponse, failures, scrollId, searchContextId));
}
onPhaseEnd();
setCurrentPhase(null);
}

@Override
public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) {
this.searchRequestOperationsListener.ifPresent(searchRequestOperations -> searchRequestOperations.onPhaseFailure(this));
raisePhaseFailure(new SearchPhaseExecutionException(phase.getName(), msg, cause, buildShardFailures()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
ClusterState clusterState,
SearchTask task,
Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory,
SearchResponse.Clusters clusters
SearchResponse.Clusters clusters,
SearchRequestOperationsListener searchRequestOperationsListener
) {
// We set max concurrent shard requests to the number of shards so no throttling happens for can_match requests
super(
Expand All @@ -110,7 +111,8 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
task,
new CanMatchSearchPhaseResults(shardsIts.size()),
shardsIts.size(),
clusters
clusters,
searchRequestOperationsListener
);
this.phaseFactory = phaseFactory;
this.shardsIts = shardsIts;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,11 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
final TransportSearchAction.SearchTimeProvider timeProvider,
final ClusterState clusterState,
final SearchTask task,
SearchResponse.Clusters clusters
SearchResponse.Clusters clusters,
SearchRequestOperationsListener searchRequestOperationsListener
) {
super(
"dfs",
SearchPhaseName.DFS_PRE_QUERY.getName(),
logger,
searchTransportService,
nodeIdToConnection,
Expand All @@ -95,7 +96,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
task,
new ArraySearchPhaseResults<>(shardsIts.size()),
request.getMaxConcurrentShardRequests(),
clusters
clusters,
searchRequestOperationsListener
);
this.queryPhaseResultConsumer = queryPhaseResultConsumer;
this.searchPhaseController = searchPhaseController;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,23 @@
*
* @opensearch.internal
*/
abstract class SearchPhase implements CheckedRunnable<IOException> {
public abstract class SearchPhase implements CheckedRunnable<IOException> {
private final String name;
private long startTimeInNanos;

protected SearchPhase(String name) {
this.name = Objects.requireNonNull(name, "name must not be null");
}

public long getStartTimeInNanos() {
return startTimeInNanos;
}

public void recordAndRun() throws IOException {
this.startTimeInNanos = System.nanoTime();
run();
}

/**
* Returns the phases name.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public interface SearchPhaseContext extends Executor {
*/
SearchRequest getRequest();

SearchPhase getCurrentPhase();

/**
* Builds and sends the final search response back to the user.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* @opensearch.internal
*/
public enum SearchPhaseName {
DFS_PRE_QUERY("dfs_pre_query"),
QUERY("query"),
FETCH("fetch"),
DFS_QUERY("dfs_query"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,11 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
final TransportSearchAction.SearchTimeProvider timeProvider,
ClusterState clusterState,
SearchTask task,
SearchResponse.Clusters clusters
SearchResponse.Clusters clusters,
SearchRequestOperationsListener searchRequestOperationsListener
) {
super(
"query",
SearchPhaseName.QUERY.getName(),
logger,
searchTransportService,
nodeIdToConnection,
Expand All @@ -100,7 +101,8 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
task,
resultConsumer,
request.getMaxConcurrentShardRequests(),
clusters
clusters,
searchRequestOperationsListener
);
this.topDocsSize = SearchPhaseController.getTopDocsSize(request);
this.trackTotalHitsUpTo = request.resolveTrackTotalHitsUpTo();
Expand Down
Loading

0 comments on commit befcde9

Please sign in to comment.