Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Support dynamically adding SearchRequestOperationsListener (#11526) #12192

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Extract cluster management for integration tests into JUnit test rule out of OpenSearchIntegTestCase ([#11877](https://github.com/opensearch-project/OpenSearch/pull/11877)), ([#12000](https://github.com/opensearch-project/OpenSearch/pull/12000))
- Workaround for https://bugs.openjdk.org/browse/JDK-8323659 regression, introduced in JDK-21.0.2 ([#11968](https://github.com/opensearch-project/OpenSearch/pull/11968))
- Updates IpField to be searchable when only `doc_values` are enabled ([#11508](https://github.com/opensearch-project/OpenSearch/pull/11508))
- Added Support for dynamically adding SearchRequestOperationsListeners with SearchRequestOperationsCompositeListenerFactory ([#11526](https://github.com/opensearch-project/OpenSearch/pull/11526))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.action.search.TransportSearchAction.SEARCH_REQUEST_STATS_ENABLED_KEY;
import static org.opensearch.action.search.SearchRequestStats.SEARCH_REQUEST_STATS_ENABLED_KEY;
import static org.opensearch.search.aggregations.AggregationBuilders.terms;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
import java.util.Set;
import java.util.function.Function;

import static org.opensearch.action.search.TransportSearchAction.SEARCH_REQUEST_STATS_ENABLED_KEY;
import static org.opensearch.action.search.SearchRequestStats.SEARCH_REQUEST_STATS_ENABLED_KEY;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public final void start() {
0,
0,
buildTookInMillis(),
timeProvider.getPhaseTook(),
searchRequestContext.getPhaseTook(),
ShardSearchFailure.EMPTY_ARRAY,
clusters,
null
Expand Down Expand Up @@ -672,7 +672,7 @@ protected final SearchResponse buildSearchResponse(
successfulOps.get(),
skippedOps.get(),
buildTookInMillis(),
timeProvider.getPhaseTook(),
searchRequestContext.getPhaseTook(),
failures,
clusters,
searchContextId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@

package org.opensearch.action.search;

import org.apache.logging.log4j.LogManager;
import org.apache.lucene.search.TotalHits;
import org.opensearch.common.annotation.InternalApi;

import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;

Expand All @@ -31,18 +29,14 @@
private TotalHits totalHits;
private final EnumMap<ShardStatsFieldNames, Integer> shardStats;

/**
* This constructor is for testing only
*/
SearchRequestContext() {
this(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()));
}
private final SearchRequest searchRequest;

SearchRequestContext(SearchRequestOperationsListener searchRequestOperationsListener) {
SearchRequestContext(final SearchRequestOperationsListener searchRequestOperationsListener, final SearchRequest searchRequest) {
this.searchRequestOperationsListener = searchRequestOperationsListener;
this.absoluteStartNanos = System.nanoTime();
this.phaseTookMap = new HashMap<>();
this.shardStats = new EnumMap<>(ShardStatsFieldNames.class);
this.searchRequest = searchRequest;
}

SearchRequestOperationsListener getSearchRequestOperationsListener() {
Expand All @@ -57,6 +51,14 @@
return phaseTookMap;
}

SearchResponse.PhaseTook getPhaseTook() {
if (searchRequest != null && searchRequest.isPhaseTook() != null && searchRequest.isPhaseTook()) {
return new SearchResponse.PhaseTook(phaseTookMap);

Check warning on line 56 in server/src/main/java/org/opensearch/action/search/SearchRequestContext.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/search/SearchRequestContext.java#L56

Added line #L56 was not covered by tests
} else {
return null;
}
}

/**
* Override absoluteStartNanos set in constructor.
* For testing only
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* SearchRequestOperationsCompositeListenerFactory contains listeners registered to search requests,
* and is responsible for creating the {@link SearchRequestOperationsListener.CompositeListener}
* with the all listeners enabled at cluster-level and request-level.
*
*
* @opensearch.internal
*/
public final class SearchRequestOperationsCompositeListenerFactory {
private final List<SearchRequestOperationsListener> searchRequestListenersList;

/**
* Create the SearchRequestOperationsCompositeListenerFactory and add multiple {@link SearchRequestOperationsListener}
* to the searchRequestListenersList.
* Those enabled listeners will be executed during each search request.
*
* @param listeners Multiple SearchRequestOperationsListener object to add.
* @throws IllegalArgumentException if any input listener is null.
*/
public SearchRequestOperationsCompositeListenerFactory(final SearchRequestOperationsListener... listeners) {
searchRequestListenersList = new ArrayList<>();
for (SearchRequestOperationsListener listener : listeners) {
if (listener == null) {
throw new IllegalArgumentException("listener must not be null");

Check warning on line 42 in server/src/main/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactory.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactory.java#L42

Added line #L42 was not covered by tests
}
searchRequestListenersList.add(listener);
}
}

/**
* Get searchRequestListenersList,
*
* @return List of SearchRequestOperationsListener
*/
public List<SearchRequestOperationsListener> getListeners() {
return searchRequestListenersList;
}

/**
* Create the {@link SearchRequestOperationsListener.CompositeListener}
* with the all listeners enabled at cluster-level and request-level.
*
* @param searchRequest The SearchRequest object used to decide which request-level listeners to add based on states/flags
* @param logger Logger to be attached to the {@link SearchRequestOperationsListener.CompositeListener}
* @param perRequestListeners the per-request listeners that can be optionally added to the returned CompositeListener list.
* @return SearchRequestOperationsListener.CompositeListener
*/
public SearchRequestOperationsListener.CompositeListener buildCompositeListener(
final SearchRequest searchRequest,
final Logger logger,
final SearchRequestOperationsListener... perRequestListeners
) {
final List<SearchRequestOperationsListener> searchListenersList = Stream.concat(
searchRequestListenersList.stream(),
Arrays.stream(perRequestListeners)
)
.filter((searchRequestOperationsListener -> searchRequestOperationsListener.isEnabled(searchRequest)))
.collect(Collectors.toList());

return new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,16 @@
* @opensearch.internal
*/
@InternalApi
abstract class SearchRequestOperationsListener {
public abstract class SearchRequestOperationsListener {
private volatile boolean enabled;

protected SearchRequestOperationsListener() {
this.enabled = true;
}

protected SearchRequestOperationsListener(final boolean enabled) {
this.enabled = enabled;
}

Check warning on line 32 in server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java#L30-L32

Added lines #L30 - L32 were not covered by tests

abstract void onPhaseStart(SearchPhaseContext context);

Expand All @@ -32,6 +41,18 @@

void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

boolean isEnabled(SearchRequest searchRequest) {
return isEnabled();
}

boolean isEnabled() {
return enabled;
}

protected void setEnabled(final boolean enabled) {
this.enabled = enabled;
}

/**
* Holder of Composite Listeners
*
Expand Down Expand Up @@ -101,5 +122,9 @@
}
}
}

public List<SearchRequestOperationsListener> getListeners() {
return listeners;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,11 @@ public SearchRequestSlowLog(ClusterService clusterService) {
this.logger = logger;
Loggers.setLevel(this.logger, SlowLogLevel.TRACE.name());

this.warnThreshold = clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_WARN_SETTING).nanos();
this.infoThreshold = clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_INFO_SETTING).nanos();
this.debugThreshold = clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_DEBUG_SETTING).nanos();
this.traceThreshold = clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_TRACE_SETTING).nanos();
this.level = clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_LEVEL);
this.setWarnThreshold(clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_WARN_SETTING));
this.setInfoThreshold(clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_INFO_SETTING));
this.setDebugThreshold(clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_DEBUG_SETTING));
this.setTraceThreshold(clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_TRACE_SETTING));
this.setLevel(clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_LEVEL));

clusterService.getClusterSettings()
.addSettingsUpdateConsumer(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_WARN_SETTING, this::setWarnThreshold);
Expand Down Expand Up @@ -233,18 +233,22 @@ private static String escapeJson(String text) {

void setWarnThreshold(TimeValue warnThreshold) {
this.warnThreshold = warnThreshold.nanos();
setEnabledIfThresholdExceed();
}

void setInfoThreshold(TimeValue infoThreshold) {
this.infoThreshold = infoThreshold.nanos();
setEnabledIfThresholdExceed();
}

void setDebugThreshold(TimeValue debugThreshold) {
this.debugThreshold = debugThreshold.nanos();
setEnabledIfThresholdExceed();
}

void setTraceThreshold(TimeValue traceThreshold) {
this.traceThreshold = traceThreshold.nanos();
setEnabledIfThresholdExceed();
}

void setLevel(SlowLogLevel level) {
Expand All @@ -270,4 +274,8 @@ protected long getTraceThreshold() {
SlowLogLevel getLevel() {
return level;
}

private void setEnabledIfThresholdExceed() {
super.setEnabled(this.warnThreshold >= 0 || this.debugThreshold >= 0 || this.infoThreshold >= 0 || this.traceThreshold >= 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.metrics.MeanMetric;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;

import java.util.EnumMap;
import java.util.Map;
Expand All @@ -26,8 +28,18 @@
public final class SearchRequestStats extends SearchRequestOperationsListener {
Map<SearchPhaseName, StatsHolder> phaseStatsMap = new EnumMap<>(SearchPhaseName.class);

public static final String SEARCH_REQUEST_STATS_ENABLED_KEY = "search.request_stats_enabled";
public static final Setting<Boolean> SEARCH_REQUEST_STATS_ENABLED = Setting.boolSetting(
SEARCH_REQUEST_STATS_ENABLED_KEY,
false,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

@Inject
public SearchRequestStats() {
public SearchRequestStats(ClusterSettings clusterSettings) {
this.setEnabled(clusterSettings.get(SEARCH_REQUEST_STATS_ENABLED));
clusterSettings.addSettingsUpdateConsumer(SEARCH_REQUEST_STATS_ENABLED, this::setEnabled);
for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) {
phaseStatsMap.put(searchPhaseName, new StatsHolder());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ final class SearchResponseMerger {
/**
* Add a search response to the list of responses to be merged together into one.
* Merges currently happen at once when all responses are available and
* {@link #getMergedResponse(SearchResponse.Clusters)} )} is called.
* {@link #getMergedResponse(SearchResponse.Clusters, SearchRequestContext)} )} is called.
* That may change in the future as it's possible to introduce incremental merges as responses come in if necessary.
*/
void add(SearchResponse searchResponse) {
Expand All @@ -126,7 +126,7 @@ int numResponses() {
* Returns the merged response. To be called once all responses have been added through {@link #add(SearchResponse)}
* so that all responses are merged into a single one.
*/
SearchResponse getMergedResponse(SearchResponse.Clusters clusters) {
SearchResponse getMergedResponse(SearchResponse.Clusters clusters, SearchRequestContext searchRequestContext) {
// if the search is only across remote clusters, none of them are available, and all of them have skip_unavailable set to true,
// we end up calling merge without anything to merge, we just return an empty search response
if (searchResponses.size() == 0) {
Expand Down Expand Up @@ -236,7 +236,7 @@ SearchResponse getMergedResponse(SearchResponse.Clusters clusters) {
successfulShards,
skippedShards,
tookInMillis,
searchTimeProvider.getPhaseTook(),
searchRequestContext.getPhaseTook(),
shardFailures,
clusters,
null
Expand Down
Loading
Loading