From 1402cedf99a0bae87a573c14a6f7b42a5f846705 Mon Sep 17 00:00:00 2001 From: PritLadani Date: Mon, 19 Dec 2022 22:44:20 +0530 Subject: [PATCH] Cancellation of in-flight search requests at coordinator level Signed-off-by: PritLadani --- .../common/settings/ClusterSettings.java | 9 +- .../SearchBackpressureService.java | 125 +++++++++++++----- .../settings/SearchBackpressureSettings.java | 6 + .../settings/SearchTaskSettings.java | 62 +++++++++ .../stats/SearchBackpressureStats.java | 16 ++- .../backpressure/stats/SearchTaskStats.java | 100 ++++++++++++++ .../trackers/CpuUsageTracker.java | 47 ++++++- .../trackers/ElapsedTimeTracker.java | 44 +++++- .../trackers/HeapUsageTracker.java | 110 +++++++++++++-- .../trackers/TaskResourceUsageTracker.java | 28 +++- .../SearchBackpressureServiceTests.java | 29 ++-- .../stats/SearchBackpressureStatsTests.java | 1 + .../stats/SearchTaskStatsTests.java | 44 ++++++ .../tasks/TaskCancellationTests.java | 24 ++-- 14 files changed, 565 insertions(+), 80 deletions(-) create mode 100644 server/src/main/java/org/opensearch/search/backpressure/settings/SearchTaskSettings.java create mode 100644 server/src/main/java/org/opensearch/search/backpressure/stats/SearchTaskStats.java create mode 100644 server/src/test/java/org/opensearch/search/backpressure/stats/SearchTaskStatsTests.java diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 7e9c7bd3123c5..8bf979c05ef30 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -44,6 +44,7 @@ import org.opensearch.search.backpressure.settings.NodeDuressSettings; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; +import org.opensearch.search.backpressure.settings.SearchTaskSettings; import org.opensearch.search.backpressure.trackers.CpuUsageTracker; import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker; import org.opensearch.search.backpressure.trackers.HeapUsageTracker; @@ -601,11 +602,17 @@ public void apply(Settings value, Settings current, Settings previous) { NodeDuressSettings.SETTING_CPU_THRESHOLD, NodeDuressSettings.SETTING_HEAP_THRESHOLD, SearchShardTaskSettings.SETTING_TOTAL_HEAP_PERCENT_THRESHOLD, + HeapUsageTracker.SETTING_HEAP_PERCENT_THRESHOLD_FOR_SEARCH_QUERY, HeapUsageTracker.SETTING_HEAP_PERCENT_THRESHOLD, + HeapUsageTracker.SETTING_HEAP_VARIANCE_THRESHOLD_FOR_SEARCH_QUERY, HeapUsageTracker.SETTING_HEAP_VARIANCE_THRESHOLD, + HeapUsageTracker.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE_FOR_SEARCH_QUERY, HeapUsageTracker.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE, + CpuUsageTracker.SETTING_CPU_TIME_MILLIS_THRESHOLD_FOR_SEARCH_QUERY, CpuUsageTracker.SETTING_CPU_TIME_MILLIS_THRESHOLD, - ElapsedTimeTracker.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD + ElapsedTimeTracker.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD_FOR_SEARCH_QUERY, + ElapsedTimeTracker.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD, + SearchTaskSettings.SETTING_TOTAL_HEAP_PERCENT_THRESHOLD_FOR_SEARCH_QUERY ) ) ); diff --git a/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java b/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java index fd13198b957da..2465790176daa 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java +++ b/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.Logger; import org.opensearch.ExceptionsHelper; import org.opensearch.action.search.SearchShardTask; +import org.opensearch.action.search.SearchTask; import org.opensearch.common.component.AbstractLifecycleComponent; import org.opensearch.common.util.TokenBucket; import org.opensearch.monitor.jvm.JvmStats; @@ -20,6 +21,7 @@ import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; import org.opensearch.search.backpressure.stats.SearchBackpressureStats; import org.opensearch.search.backpressure.stats.SearchShardTaskStats; +import org.opensearch.search.backpressure.stats.SearchTaskStats; import org.opensearch.search.backpressure.trackers.CpuUsageTracker; import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker; import org.opensearch.search.backpressure.trackers.HeapUsageTracker; @@ -37,7 +39,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongSupplier; @@ -68,9 +72,12 @@ public class SearchBackpressureService extends AbstractLifecycleComponent private final AtomicReference taskCancellationRateLimiter = new AtomicReference<>(); private final AtomicReference taskCancellationRatioLimiter = new AtomicReference<>(); - // Currently, only the state of SearchShardTask is being tracked. - // This can be generalized to Map once we start supporting cancellation of SearchTasks as well. - private final SearchBackpressureState state = new SearchBackpressureState(); + private final Map, SearchBackpressureState> searchBackpressureStates = new HashMap<>() { + { + put(SearchTask.class, new SearchBackpressureState()); + put(SearchShardTask.class, new SearchBackpressureState()); + } + }; public SearchBackpressureService( SearchBackpressureSettings settings, @@ -116,10 +123,15 @@ public SearchBackpressureService( ); this.taskCancellationRatioLimiter.set( - new TokenBucket(state::getCompletionCount, getSettings().getCancellationRatio(), getSettings().getCancellationBurst()) + new TokenBucket(this::getTaskCompletionCount, getSettings().getCancellationRatio(), getSettings().getCancellationBurst()) ); } + private long getTaskCompletionCount() { + return searchBackpressureStates.get(SearchTask.class).getCompletionCount() + searchBackpressureStates.get(SearchShardTask.class) + .getCompletionCount(); + } + void doRun() { SearchBackpressureMode mode = getSettings().getMode(); if (mode == SearchBackpressureMode.DISABLED) { @@ -130,18 +142,29 @@ void doRun() { return; } - // We are only targeting in-flight cancellation of SearchShardTask for now. - List searchShardTasks = getSearchShardTasks(); + List searchTasks = getSearchTasks(); + List searchShardTasks = getSearchShardTasks(); + List cancellableTasks = new ArrayList<>(); // Force-refresh usage stats of these tasks before making a cancellation decision. + taskResourceTrackingService.refreshResourceStats(searchTasks.toArray(new Task[0])); taskResourceTrackingService.refreshResourceStats(searchShardTasks.toArray(new Task[0])); - // Skip cancellation if the increase in heap usage is not due to search requests. - if (isHeapUsageDominatedBySearch(searchShardTasks) == false) { + // Check if increase in heap usage is due to SearchTasks + if (isHeapUsageDominatedBySearch(searchTasks, getSettings().getSearchTaskSettings().getTotalHeapBytesThreshold())) { + cancellableTasks.addAll(searchTasks); + } + + // Check if increase in heap usage is due to SearchShardTasks + if (isHeapUsageDominatedBySearch(searchShardTasks, getSettings().getSearchShardTaskSettings().getTotalHeapBytesThreshold())) { + cancellableTasks.addAll(searchShardTasks); + } + + if (cancellableTasks.isEmpty()) { return; } - for (TaskCancellation taskCancellation : getTaskCancellations(searchShardTasks)) { + for (TaskCancellation taskCancellation : getTaskCancellations(cancellableTasks)) { logger.debug( "[{} mode] cancelling task [{}] due to high resource consumption [{}]", mode.getName(), @@ -160,7 +183,10 @@ void doRun() { // Stop cancelling tasks if there are no tokens in either of the two token buckets. if (rateLimitReached && ratioLimitReached) { logger.debug("task cancellation limit reached"); - state.incrementLimitReachedCount(); + SearchBackpressureState searchBackpressureState = searchBackpressureStates.get( + (taskCancellation.getTask() instanceof SearchTask) ? SearchTask.class : SearchShardTask.class + ); + searchBackpressureState.incrementLimitReachedCount(); break; } @@ -187,9 +213,8 @@ boolean isNodeInDuress() { /** * Returns true if the increase in heap usage is due to search requests. */ - boolean isHeapUsageDominatedBySearch(List searchShardTasks) { - long usage = searchShardTasks.stream().mapToLong(task -> task.getTotalResourceStats().getMemoryInBytes()).sum(); - long threshold = getSettings().getSearchShardTaskSettings().getTotalHeapBytesThreshold(); + boolean isHeapUsageDominatedBySearch(List cancellableTasks, long threshold) { + long usage = cancellableTasks.stream().mapToLong(task -> task.getTotalResourceStats().getMemoryInBytes()).sum(); if (usage < threshold) { logger.debug("heap usage not dominated by search requests [{}/{}]", usage, threshold); return false; @@ -201,7 +226,7 @@ boolean isHeapUsageDominatedBySearch(List searchShardTasks) { /** * Filters and returns the list of currently running SearchShardTasks. */ - List getSearchShardTasks() { + List getSearchShardTasks() { return taskResourceTrackingService.getResourceAwareTasks() .values() .stream() @@ -210,6 +235,18 @@ List getSearchShardTasks() { .collect(Collectors.toUnmodifiableList()); } + /** + * Filters and returns the list of currently running SearchTasks. + */ + List getSearchTasks() { + return taskResourceTrackingService.getResourceAwareTasks() + .values() + .stream() + .filter(task -> task instanceof SearchTask) + .map(task -> (SearchTask) task) + .collect(Collectors.toUnmodifiableList()); + } + /** * Returns a TaskCancellation wrapper containing the list of reasons (possibly zero), along with an overall * cancellation score for the given task. Cancelling a task with a higher score has better chance of recovering the @@ -222,13 +259,19 @@ TaskCancellation getTaskCancellation(CancellableTask task) { for (TaskResourceUsageTracker tracker : taskResourceUsageTrackers) { Optional reason = tracker.checkAndMaybeGetCancellationReason(task); if (reason.isPresent()) { + if (task instanceof SearchTask) { + callbacks.add(tracker::incrementSearchTaskCancellations); + } else { + callbacks.add(tracker::incrementSearchShardTaskCancellations); + } reasons.add(reason.get()); - callbacks.add(tracker::incrementCancellations); } } - if (task instanceof SearchShardTask) { - callbacks.add(state::incrementCancellationCount); + if (task instanceof SearchTask) { + callbacks.add(searchBackpressureStates.get(SearchTask.class)::incrementCancellationCount); + } else { + callbacks.add(searchBackpressureStates.get(SearchShardTask.class)::incrementCancellationCount); } return new TaskCancellation(task, reasons, callbacks); @@ -249,8 +292,12 @@ SearchBackpressureSettings getSettings() { return settings; } - SearchBackpressureState getState() { - return state; + SearchBackpressureState getSearchTasksState() { + return searchBackpressureStates.get(SearchTask.class); + } + + SearchBackpressureState getSearchShardTasksState() { + return searchBackpressureStates.get(SearchShardTask.class); } @Override @@ -259,19 +306,22 @@ public void onTaskCompleted(Task task) { return; } - if (task instanceof SearchShardTask == false) { + if (task instanceof SearchTask == false && task instanceof SearchShardTask == false) { return; } - SearchShardTask searchShardTask = (SearchShardTask) task; - if (searchShardTask.isCancelled() == false) { - state.incrementCompletionCount(); + CancellableTask cancellableTask = (CancellableTask) task; + SearchBackpressureState searchBackpressureState = searchBackpressureStates.get( + (task instanceof SearchTask) ? SearchTask.class : SearchShardTask.class + ); + if (cancellableTask.isCancelled() == false) { + searchBackpressureState.incrementCompletionCount(); } List exceptions = new ArrayList<>(); for (TaskResourceUsageTracker tracker : taskResourceUsageTrackers) { try { - tracker.update(searchShardTask); + tracker.update(task); } catch (Exception e) { exceptions.add(e); } @@ -282,7 +332,7 @@ public void onTaskCompleted(Task task) { @Override public void onCancellationRatioChanged() { taskCancellationRatioLimiter.set( - new TokenBucket(state::getCompletionCount, getSettings().getCancellationRatio(), getSettings().getCancellationBurst()) + new TokenBucket(this::getTaskCompletionCount, getSettings().getCancellationRatio(), getSettings().getCancellationBurst()) ); } @@ -321,15 +371,30 @@ protected void doStop() { protected void doClose() throws IOException {} public SearchBackpressureStats nodeStats() { - List searchShardTasks = getSearchShardTasks(); + List searchTasks = getSearchTasks(); + List searchShardTasks = getSearchShardTasks(); + + SearchTaskStats searchTaskStats = new SearchTaskStats( + searchBackpressureStates.get(SearchTask.class).getCancellationCount(), + searchBackpressureStates.get(SearchTask.class).getLimitReachedCount(), + taskResourceUsageTrackers.stream() + .collect( + Collectors.toUnmodifiableMap(t -> TaskResourceUsageTrackerType.fromName(t.name()), t -> t.searchTaskStats(searchTasks)) + ) + ); SearchShardTaskStats searchShardTaskStats = new SearchShardTaskStats( - state.getCancellationCount(), - state.getLimitReachedCount(), + searchBackpressureStates.get(SearchShardTask.class).getCancellationCount(), + searchBackpressureStates.get(SearchShardTask.class).getLimitReachedCount(), taskResourceUsageTrackers.stream() - .collect(Collectors.toUnmodifiableMap(t -> TaskResourceUsageTrackerType.fromName(t.name()), t -> t.stats(searchShardTasks))) + .collect( + Collectors.toUnmodifiableMap( + t -> TaskResourceUsageTrackerType.fromName(t.name()), + t -> t.searchShardTaskStats(searchShardTasks) + ) + ) ); - return new SearchBackpressureStats(searchShardTaskStats, getSettings().getMode()); + return new SearchBackpressureStats(searchTaskStats, searchShardTaskStats, getSettings().getMode()); } } diff --git a/server/src/main/java/org/opensearch/search/backpressure/settings/SearchBackpressureSettings.java b/server/src/main/java/org/opensearch/search/backpressure/settings/SearchBackpressureSettings.java index df2c04a730fbc..3906228389729 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/settings/SearchBackpressureSettings.java +++ b/server/src/main/java/org/opensearch/search/backpressure/settings/SearchBackpressureSettings.java @@ -110,12 +110,14 @@ public interface Listener { private final Settings settings; private final ClusterSettings clusterSettings; private final NodeDuressSettings nodeDuressSettings; + private final SearchTaskSettings searchTaskSettings; private final SearchShardTaskSettings searchShardTaskSettings; public SearchBackpressureSettings(Settings settings, ClusterSettings clusterSettings) { this.settings = settings; this.clusterSettings = clusterSettings; this.nodeDuressSettings = new NodeDuressSettings(settings, clusterSettings); + this.searchTaskSettings = new SearchTaskSettings(settings, clusterSettings); this.searchShardTaskSettings = new SearchShardTaskSettings(settings, clusterSettings); interval = new TimeValue(SETTING_INTERVAL_MILLIS.get(settings)); @@ -149,6 +151,10 @@ public NodeDuressSettings getNodeDuressSettings() { return nodeDuressSettings; } + public SearchTaskSettings getSearchTaskSettings() { + return searchTaskSettings; + } + public SearchShardTaskSettings getSearchShardTaskSettings() { return searchShardTaskSettings; } diff --git a/server/src/main/java/org/opensearch/search/backpressure/settings/SearchTaskSettings.java b/server/src/main/java/org/opensearch/search/backpressure/settings/SearchTaskSettings.java new file mode 100644 index 0000000000000..f28cdd17a3cff --- /dev/null +++ b/server/src/main/java/org/opensearch/search/backpressure/settings/SearchTaskSettings.java @@ -0,0 +1,62 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.backpressure.settings; + +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.monitor.jvm.JvmStats; + +/** + * Defines the settings related to the cancellation of SearchTasks. + * + * @opensearch.internal + */ + +public class SearchTaskSettings { + private static final long HEAP_SIZE_BYTES = JvmStats.jvmStats().getMem().getHeapMax().getBytes(); + + private static class Defaults { + private static final double TOTAL_HEAP_PERCENT_THRESHOLD = 0.05; + } + + /** + * Defines the heap usage threshold (in percentage) for the sum of heap usages across all search tasks + * before in-flight cancellation is applied. + */ + private volatile double totalHeapPercentThreshold; + public static final Setting SETTING_TOTAL_HEAP_PERCENT_THRESHOLD_FOR_SEARCH_QUERY = Setting.doubleSetting( + "search_backpressure.search_task.total_heap_percent_threshold", + Defaults.TOTAL_HEAP_PERCENT_THRESHOLD, + 0.0, + 1.0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public SearchTaskSettings(Settings settings, ClusterSettings clusterSettings) { + totalHeapPercentThreshold = SETTING_TOTAL_HEAP_PERCENT_THRESHOLD_FOR_SEARCH_QUERY.get(settings); + clusterSettings.addSettingsUpdateConsumer( + SETTING_TOTAL_HEAP_PERCENT_THRESHOLD_FOR_SEARCH_QUERY, + this::setTotalHeapPercentThreshold + ); + } + + public double getTotalHeapPercentThreshold() { + return totalHeapPercentThreshold; + } + + public long getTotalHeapBytesThreshold() { + return (long) (HEAP_SIZE_BYTES * getTotalHeapPercentThreshold()); + } + + private void setTotalHeapPercentThreshold(double totalHeapPercentThreshold) { + this.totalHeapPercentThreshold = totalHeapPercentThreshold; + } +} diff --git a/server/src/main/java/org/opensearch/search/backpressure/stats/SearchBackpressureStats.java b/server/src/main/java/org/opensearch/search/backpressure/stats/SearchBackpressureStats.java index 3aec0dfc579c5..92a52b62477f2 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/stats/SearchBackpressureStats.java +++ b/server/src/main/java/org/opensearch/search/backpressure/stats/SearchBackpressureStats.java @@ -22,21 +22,28 @@ * Stats related to search backpressure. */ public class SearchBackpressureStats implements ToXContentFragment, Writeable { + private final SearchTaskStats searchTaskStats; private final SearchShardTaskStats searchShardTaskStats; private final SearchBackpressureMode mode; - public SearchBackpressureStats(SearchShardTaskStats searchShardTaskStats, SearchBackpressureMode mode) { + public SearchBackpressureStats( + SearchTaskStats searchTaskStats, + SearchShardTaskStats searchShardTaskStats, + SearchBackpressureMode mode + ) { + this.searchTaskStats = searchTaskStats; this.searchShardTaskStats = searchShardTaskStats; this.mode = mode; } public SearchBackpressureStats(StreamInput in) throws IOException { - this(new SearchShardTaskStats(in), SearchBackpressureMode.fromName(in.readString())); + this(new SearchTaskStats(in), new SearchShardTaskStats(in), SearchBackpressureMode.fromName(in.readString())); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { return builder.startObject("search_backpressure") + .field("search_task", searchTaskStats) .field("search_shard_task", searchShardTaskStats) .field("mode", mode.getName()) .endObject(); @@ -44,6 +51,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws @Override public void writeTo(StreamOutput out) throws IOException { + searchTaskStats.writeTo(out); searchShardTaskStats.writeTo(out); out.writeString(mode.getName()); } @@ -53,11 +61,11 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; SearchBackpressureStats that = (SearchBackpressureStats) o; - return searchShardTaskStats.equals(that.searchShardTaskStats) && mode == that.mode; + return searchTaskStats.equals(that.searchTaskStats) && searchShardTaskStats.equals(that.searchShardTaskStats) && mode == that.mode; } @Override public int hashCode() { - return Objects.hash(searchShardTaskStats, mode); + return Objects.hash(searchTaskStats, searchShardTaskStats, mode); } } diff --git a/server/src/main/java/org/opensearch/search/backpressure/stats/SearchTaskStats.java b/server/src/main/java/org/opensearch/search/backpressure/stats/SearchTaskStats.java new file mode 100644 index 0000000000000..87318a60b46fd --- /dev/null +++ b/server/src/main/java/org/opensearch/search/backpressure/stats/SearchTaskStats.java @@ -0,0 +1,100 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.backpressure.stats; + +import org.opensearch.common.collect.MapBuilder; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.xcontent.ToXContent; +import org.opensearch.common.xcontent.ToXContentObject; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.search.backpressure.trackers.CpuUsageTracker; +import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker; +import org.opensearch.search.backpressure.trackers.HeapUsageTracker; +import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker; +import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +/** + * Stats related to cancelled search tasks. + */ + +public class SearchTaskStats implements ToXContentObject, Writeable { + private final long cancellationCount; + private final long limitReachedCount; + private final Map resourceUsageTrackerStats; + + public SearchTaskStats( + long cancellationCount, + long limitReachedCount, + Map resourceUsageTrackerStats + ) { + this.cancellationCount = cancellationCount; + this.limitReachedCount = limitReachedCount; + this.resourceUsageTrackerStats = resourceUsageTrackerStats; + } + + public SearchTaskStats(StreamInput in) throws IOException { + this.cancellationCount = in.readVLong(); + this.limitReachedCount = in.readVLong(); + + MapBuilder builder = new MapBuilder<>(); + builder.put(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, in.readOptionalWriteable(CpuUsageTracker.Stats::new)); + builder.put(TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER, in.readOptionalWriteable(HeapUsageTracker.Stats::new)); + builder.put(TaskResourceUsageTrackerType.ELAPSED_TIME_TRACKER, in.readOptionalWriteable(ElapsedTimeTracker.Stats::new)); + this.resourceUsageTrackerStats = builder.immutableMap(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject(); + + builder.startObject("resource_tracker_stats"); + for (Map.Entry entry : resourceUsageTrackerStats.entrySet()) { + builder.field(entry.getKey().getName(), entry.getValue()); + } + builder.endObject(); + + builder.startObject("cancellation_stats") + .field("cancellation_count", cancellationCount) + .field("cancellation_limit_reached_count", limitReachedCount) + .endObject(); + + return builder.endObject(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(cancellationCount); + out.writeVLong(limitReachedCount); + + out.writeOptionalWriteable(resourceUsageTrackerStats.get(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER)); + out.writeOptionalWriteable(resourceUsageTrackerStats.get(TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER)); + out.writeOptionalWriteable(resourceUsageTrackerStats.get(TaskResourceUsageTrackerType.ELAPSED_TIME_TRACKER)); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SearchTaskStats that = (SearchTaskStats) o; + return cancellationCount == that.cancellationCount + && limitReachedCount == that.limitReachedCount + && resourceUsageTrackerStats.equals(that.resourceUsageTrackerStats); + } + + @Override + public int hashCode() { + return Objects.hash(cancellationCount, limitReachedCount, resourceUsageTrackerStats); + } +} diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java index 21bb3af32ae08..1e332eca2649c 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java @@ -8,6 +8,7 @@ package org.opensearch.search.backpressure.trackers; +import org.opensearch.action.search.SearchTask; import org.opensearch.common.settings.Setting; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; @@ -32,11 +33,24 @@ */ public class CpuUsageTracker extends TaskResourceUsageTracker { private static class Defaults { + private static final long CPU_TIME_MILLIS_THRESHOLD_FOR_SEARCH_QUERY = 60000; private static final long CPU_TIME_MILLIS_THRESHOLD = 15000; } /** - * Defines the CPU usage threshold (in millis) for an individual task before it is considered for cancellation. + * Defines the CPU usage threshold (in millis) for an individual search task before it is considered for cancellation. + */ + private volatile long cpuTimeMillisThresholdForSearchQuery; + public static final Setting SETTING_CPU_TIME_MILLIS_THRESHOLD_FOR_SEARCH_QUERY = Setting.longSetting( + "search_backpressure.search_task.cpu_time_millis_threshold_for_search_query", + Defaults.CPU_TIME_MILLIS_THRESHOLD_FOR_SEARCH_QUERY, + 0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Defines the CPU usage threshold (in millis) for an individual search shard task before it is considered for cancellation. */ private volatile long cpuTimeMillisThreshold; public static final Setting SETTING_CPU_TIME_MILLIS_THRESHOLD = Setting.longSetting( @@ -48,7 +62,10 @@ private static class Defaults { ); public CpuUsageTracker(SearchBackpressureSettings settings) { + this.cpuTimeMillisThresholdForSearchQuery = SETTING_CPU_TIME_MILLIS_THRESHOLD_FOR_SEARCH_QUERY.get(settings.getSettings()); this.cpuTimeMillisThreshold = SETTING_CPU_TIME_MILLIS_THRESHOLD.get(settings.getSettings()); + settings.getClusterSettings() + .addSettingsUpdateConsumer(SETTING_CPU_TIME_MILLIS_THRESHOLD_FOR_SEARCH_QUERY, this::setCpuTimeMillisThresholdForSearchQuery); settings.getClusterSettings().addSettingsUpdateConsumer(SETTING_CPU_TIME_MILLIS_THRESHOLD, this::setCpuTimeMillisThreshold); } @@ -60,7 +77,7 @@ public String name() { @Override public Optional checkAndMaybeGetCancellationReason(Task task) { long usage = task.getTotalResourceStats().getCpuTimeInNanos(); - long threshold = getCpuTimeNanosThreshold(); + long threshold = (task instanceof SearchTask) ? getCpuTimeNanosThresholdForSearchQuery() : getCpuTimeNanosThreshold(); if (usage < threshold) { return Optional.empty(); @@ -78,19 +95,37 @@ public Optional checkAndMaybeGetCancellationReason(Task ); } + public long getCpuTimeNanosThresholdForSearchQuery() { + return TimeUnit.MILLISECONDS.toNanos(cpuTimeMillisThresholdForSearchQuery); + } + public long getCpuTimeNanosThreshold() { return TimeUnit.MILLISECONDS.toNanos(cpuTimeMillisThreshold); } + public void setCpuTimeMillisThresholdForSearchQuery(long cpuTimeMillisThresholdForSearchQuery) { + this.cpuTimeMillisThresholdForSearchQuery = cpuTimeMillisThresholdForSearchQuery; + } + public void setCpuTimeMillisThreshold(long cpuTimeMillisThreshold) { this.cpuTimeMillisThreshold = cpuTimeMillisThreshold; } @Override - public TaskResourceUsageTracker.Stats stats(List activeTasks) { - long currentMax = activeTasks.stream().mapToLong(t -> t.getTotalResourceStats().getCpuTimeInNanos()).max().orElse(0); - long currentAvg = (long) activeTasks.stream().mapToLong(t -> t.getTotalResourceStats().getCpuTimeInNanos()).average().orElse(0); - return new Stats(getCancellations(), currentMax, currentAvg); + public TaskResourceUsageTracker.Stats searchTaskStats(List searchTasks) { + long currentMax = searchTasks.stream().mapToLong(t -> t.getTotalResourceStats().getCpuTimeInNanos()).max().orElse(0); + long currentAvg = (long) searchTasks.stream().mapToLong(t -> t.getTotalResourceStats().getCpuTimeInNanos()).average().orElse(0); + return new Stats(getSearchTaskCancellationCount(), currentMax, currentAvg); + } + + @Override + public TaskResourceUsageTracker.Stats searchShardTaskStats(List searchShardTasks) { + long currentMax = searchShardTasks.stream().mapToLong(t -> t.getTotalResourceStats().getCpuTimeInNanos()).max().orElse(0); + long currentAvg = (long) searchShardTasks.stream() + .mapToLong(t -> t.getTotalResourceStats().getCpuTimeInNanos()) + .average() + .orElse(0); + return new Stats(getSearchShardTaskCancellationCount(), currentMax, currentAvg); } /** diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java index 10e53e2bce5ae..eba8c4ee7afd8 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java @@ -8,6 +8,7 @@ package org.opensearch.search.backpressure.trackers; +import org.opensearch.action.search.SearchTask; import org.opensearch.common.settings.Setting; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; @@ -33,11 +34,24 @@ */ public class ElapsedTimeTracker extends TaskResourceUsageTracker { private static class Defaults { + private static final long ELAPSED_TIME_MILLIS_THRESHOLD_FOR_SEARCH_QUERY = 120000; private static final long ELAPSED_TIME_MILLIS_THRESHOLD = 30000; } /** - * Defines the elapsed time threshold (in millis) for an individual task before it is considered for cancellation. + * Defines the elapsed time threshold (in millis) for an individual search task before it is considered for cancellation. + */ + private volatile long elapsedTimeMillisThresholdForSearchQuery; + public static final Setting SETTING_ELAPSED_TIME_MILLIS_THRESHOLD_FOR_SEARCH_QUERY = Setting.longSetting( + "search_backpressure.search_task.elapsed_time_millis_threshold_for_search_query", + Defaults.ELAPSED_TIME_MILLIS_THRESHOLD_FOR_SEARCH_QUERY, + 0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Defines the elapsed time threshold (in millis) for an individual search shard task before it is considered for cancellation. */ private volatile long elapsedTimeMillisThreshold; public static final Setting SETTING_ELAPSED_TIME_MILLIS_THRESHOLD = Setting.longSetting( @@ -52,7 +66,13 @@ private static class Defaults { public ElapsedTimeTracker(SearchBackpressureSettings settings, LongSupplier timeNanosSupplier) { this.timeNanosSupplier = timeNanosSupplier; + this.elapsedTimeMillisThresholdForSearchQuery = SETTING_ELAPSED_TIME_MILLIS_THRESHOLD_FOR_SEARCH_QUERY.get(settings.getSettings()); this.elapsedTimeMillisThreshold = SETTING_ELAPSED_TIME_MILLIS_THRESHOLD.get(settings.getSettings()); + settings.getClusterSettings() + .addSettingsUpdateConsumer( + SETTING_ELAPSED_TIME_MILLIS_THRESHOLD_FOR_SEARCH_QUERY, + this::setElapsedTimeMillisThresholdForSearchQuery + ); settings.getClusterSettings().addSettingsUpdateConsumer(SETTING_ELAPSED_TIME_MILLIS_THRESHOLD, this::setElapsedTimeMillisThreshold); } @@ -64,7 +84,7 @@ public String name() { @Override public Optional checkAndMaybeGetCancellationReason(Task task) { long usage = timeNanosSupplier.getAsLong() - task.getStartTimeNanos(); - long threshold = getElapsedTimeNanosThreshold(); + long threshold = (task instanceof SearchTask) ? getElapsedTimeNanosThresholdForSearchQuery() : getElapsedTimeNanosThreshold(); if (usage < threshold) { return Optional.empty(); @@ -82,20 +102,36 @@ public Optional checkAndMaybeGetCancellationReason(Task ); } + public long getElapsedTimeNanosThresholdForSearchQuery() { + return TimeUnit.MILLISECONDS.toNanos(elapsedTimeMillisThresholdForSearchQuery); + } + public long getElapsedTimeNanosThreshold() { return TimeUnit.MILLISECONDS.toNanos(elapsedTimeMillisThreshold); } + public void setElapsedTimeMillisThresholdForSearchQuery(long elapsedTimeMillisThresholdForSearchQuery) { + this.elapsedTimeMillisThresholdForSearchQuery = elapsedTimeMillisThresholdForSearchQuery; + } + public void setElapsedTimeMillisThreshold(long elapsedTimeMillisThreshold) { this.elapsedTimeMillisThreshold = elapsedTimeMillisThreshold; } @Override - public TaskResourceUsageTracker.Stats stats(List activeTasks) { + public TaskResourceUsageTracker.Stats searchTaskStats(List activeTasks) { + long now = timeNanosSupplier.getAsLong(); + long currentMax = activeTasks.stream().mapToLong(t -> now - t.getStartTimeNanos()).max().orElse(0); + long currentAvg = (long) activeTasks.stream().mapToLong(t -> now - t.getStartTimeNanos()).average().orElse(0); + return new Stats(getSearchTaskCancellationCount(), currentMax, currentAvg); + } + + @Override + public TaskResourceUsageTracker.Stats searchShardTaskStats(List activeTasks) { long now = timeNanosSupplier.getAsLong(); long currentMax = activeTasks.stream().mapToLong(t -> now - t.getStartTimeNanos()).max().orElse(0); long currentAvg = (long) activeTasks.stream().mapToLong(t -> now - t.getStartTimeNanos()).average().orElse(0); - return new Stats(getCancellations(), currentMax, currentAvg); + return new Stats(getSearchShardTaskCancellationCount(), currentMax, currentAvg); } /** diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java index d1a264609e522..31f62055dbfc5 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java @@ -8,6 +8,7 @@ package org.opensearch.search.backpressure.trackers; +import org.opensearch.action.search.SearchTask; import org.opensearch.common.settings.Setting; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; @@ -37,13 +38,29 @@ public class HeapUsageTracker extends TaskResourceUsageTracker { private static final long HEAP_SIZE_BYTES = JvmStats.jvmStats().getMem().getHeapMax().getBytes(); private static class Defaults { + private static final double HEAP_PERCENT_THRESHOLD_FOR_SEARCH_QUERY = 0.02; private static final double HEAP_PERCENT_THRESHOLD = 0.005; + private static final double HEAP_VARIANCE_THRESHOLD_FOR_SEARCH_QUERY = 2.0; private static final double HEAP_VARIANCE_THRESHOLD = 2.0; + private static final int HEAP_MOVING_AVERAGE_WINDOW_SIZE_FOR_SEARCH_QUERY = 100; private static final int HEAP_MOVING_AVERAGE_WINDOW_SIZE = 100; } /** - * Defines the heap usage threshold (in percentage) for an individual task before it is considered for cancellation. + * Defines the heap usage threshold (in percentage) for an individual search task before it is considered for cancellation. + */ + private volatile double heapPercentThresholdForSearchQuery; + public static final Setting SETTING_HEAP_PERCENT_THRESHOLD_FOR_SEARCH_QUERY = Setting.doubleSetting( + "search_backpressure.search_task.heap_percent_threshold_for_search_query", + Defaults.HEAP_PERCENT_THRESHOLD_FOR_SEARCH_QUERY, + 0.0, + 1.0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Defines the heap usage threshold (in percentage) for an individual search shard task before it is considered for cancellation. */ private volatile double heapPercentThreshold; public static final Setting SETTING_HEAP_PERCENT_THRESHOLD = Setting.doubleSetting( @@ -56,7 +73,20 @@ private static class Defaults { ); /** - * Defines the heap usage variance for an individual task before it is considered for cancellation. + * Defines the heap usage variance for an individual search task before it is considered for cancellation. + * A task is considered for cancellation when taskHeapUsage is greater than or equal to heapUsageMovingAverage * variance. + */ + private volatile double heapVarianceThresholdForSearchQuery; + public static final Setting SETTING_HEAP_VARIANCE_THRESHOLD_FOR_SEARCH_QUERY = Setting.doubleSetting( + "search_backpressure.search_task.heap_variance_for_search_query", + Defaults.HEAP_VARIANCE_THRESHOLD_FOR_SEARCH_QUERY, + 0.0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Defines the heap usage variance for an individual search shard task before it is considered for cancellation. * A task is considered for cancellation when taskHeapUsage is greater than or equal to heapUsageMovingAverage * variance. */ private volatile double heapVarianceThreshold; @@ -69,7 +99,19 @@ private static class Defaults { ); /** - * Defines the window size to calculate the moving average of heap usage of completed tasks. + * Defines the window size to calculate the moving average of heap usage of completed search tasks. + */ + private volatile int heapMovingAverageWindowSizeForSearchQuery; + public static final Setting SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE_FOR_SEARCH_QUERY = Setting.intSetting( + "search_backpressure.search_task.heap_moving_average_window_size_for_search_query", + Defaults.HEAP_MOVING_AVERAGE_WINDOW_SIZE_FOR_SEARCH_QUERY, + 0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Defines the window size to calculate the moving average of heap usage of completed search shard tasks. */ private volatile int heapMovingAverageWindowSize; public static final Setting SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE = Setting.intSetting( @@ -80,19 +122,33 @@ private static class Defaults { Setting.Property.NodeScope ); + private final AtomicReference movingAverageReferenceForSearchQuery; private final AtomicReference movingAverageReference; public HeapUsageTracker(SearchBackpressureSettings settings) { + heapPercentThresholdForSearchQuery = SETTING_HEAP_PERCENT_THRESHOLD_FOR_SEARCH_QUERY.get(settings.getSettings()); + settings.getClusterSettings() + .addSettingsUpdateConsumer(SETTING_HEAP_PERCENT_THRESHOLD_FOR_SEARCH_QUERY, this::setHeapPercentThresholdForSearchQuery); heapPercentThreshold = SETTING_HEAP_PERCENT_THRESHOLD.get(settings.getSettings()); settings.getClusterSettings().addSettingsUpdateConsumer(SETTING_HEAP_PERCENT_THRESHOLD, this::setHeapPercentThreshold); + heapPercentThresholdForSearchQuery = SETTING_HEAP_VARIANCE_THRESHOLD_FOR_SEARCH_QUERY.get(settings.getSettings()); + settings.getClusterSettings() + .addSettingsUpdateConsumer(SETTING_HEAP_VARIANCE_THRESHOLD_FOR_SEARCH_QUERY, this::setHeapVarianceThresholdForSearchQuery); heapVarianceThreshold = SETTING_HEAP_VARIANCE_THRESHOLD.get(settings.getSettings()); settings.getClusterSettings().addSettingsUpdateConsumer(SETTING_HEAP_VARIANCE_THRESHOLD, this::setHeapVarianceThreshold); + heapMovingAverageWindowSizeForSearchQuery = SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE_FOR_SEARCH_QUERY.get(settings.getSettings()); + settings.getClusterSettings() + .addSettingsUpdateConsumer( + SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE_FOR_SEARCH_QUERY, + this::setHeapMovingAverageWindowSizeForSearchQuery + ); heapMovingAverageWindowSize = SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE.get(settings.getSettings()); settings.getClusterSettings() .addSettingsUpdateConsumer(SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE, this::setHeapMovingAverageWindowSize); + this.movingAverageReferenceForSearchQuery = new AtomicReference<>(new MovingAverage(heapMovingAverageWindowSizeForSearchQuery)); this.movingAverageReference = new AtomicReference<>(new MovingAverage(heapMovingAverageWindowSize)); } @@ -103,12 +159,18 @@ public String name() { @Override public void update(Task task) { - movingAverageReference.get().record(task.getTotalResourceStats().getMemoryInBytes()); + if (task instanceof SearchTask) { + movingAverageReferenceForSearchQuery.get().record(task.getTotalResourceStats().getMemoryInBytes()); + } else { + movingAverageReference.get().record(task.getTotalResourceStats().getMemoryInBytes()); + } } @Override public Optional checkAndMaybeGetCancellationReason(Task task) { - MovingAverage movingAverage = movingAverageReference.get(); + MovingAverage movingAverage = (task instanceof SearchTask) + ? movingAverageReferenceForSearchQuery.get() + : movingAverageReference.get(); // There haven't been enough measurements. if (movingAverage.isReady() == false) { @@ -117,9 +179,11 @@ public Optional checkAndMaybeGetCancellationReason(Task double currentUsage = task.getTotalResourceStats().getMemoryInBytes(); double averageUsage = movingAverage.getAverage(); - double allowedUsage = averageUsage * getHeapVarianceThreshold(); + double variance = (task instanceof SearchTask) ? getHeapVarianceThresholdForSearchQuery() : getHeapBytesThreshold(); + double allowedUsage = averageUsage * variance; + double threshold = (task instanceof SearchTask) ? getHeapBytesThresholdForSearchQuery() : getHeapBytesThreshold(); - if (currentUsage < getHeapBytesThreshold() || currentUsage < allowedUsage) { + if (currentUsage < threshold || currentUsage < allowedUsage) { return Optional.empty(); } @@ -131,32 +195,60 @@ public Optional checkAndMaybeGetCancellationReason(Task ); } + public long getHeapBytesThresholdForSearchQuery() { + return (long) (HEAP_SIZE_BYTES * heapPercentThresholdForSearchQuery); + } + public long getHeapBytesThreshold() { return (long) (HEAP_SIZE_BYTES * heapPercentThreshold); } + public void setHeapPercentThresholdForSearchQuery(double heapPercentThresholdForSearchQuery) { + this.heapPercentThresholdForSearchQuery = heapPercentThresholdForSearchQuery; + } + public void setHeapPercentThreshold(double heapPercentThreshold) { this.heapPercentThreshold = heapPercentThreshold; } + public double getHeapVarianceThresholdForSearchQuery() { + return heapVarianceThresholdForSearchQuery; + } + public double getHeapVarianceThreshold() { return heapVarianceThreshold; } + public void setHeapVarianceThresholdForSearchQuery(double heapVarianceThresholdForSearchQuery) { + this.heapVarianceThresholdForSearchQuery = heapVarianceThresholdForSearchQuery; + } + public void setHeapVarianceThreshold(double heapVarianceThreshold) { this.heapVarianceThreshold = heapVarianceThreshold; } + public void setHeapMovingAverageWindowSizeForSearchQuery(int heapMovingAverageWindowSizeForSearchQuery) { + this.heapMovingAverageWindowSizeForSearchQuery = heapMovingAverageWindowSizeForSearchQuery; + this.movingAverageReferenceForSearchQuery.set(new MovingAverage(heapMovingAverageWindowSizeForSearchQuery)); + } + public void setHeapMovingAverageWindowSize(int heapMovingAverageWindowSize) { this.heapMovingAverageWindowSize = heapMovingAverageWindowSize; this.movingAverageReference.set(new MovingAverage(heapMovingAverageWindowSize)); } @Override - public TaskResourceUsageTracker.Stats stats(List activeTasks) { + public TaskResourceUsageTracker.Stats searchTaskStats(List activeTasks) { + long currentMax = activeTasks.stream().mapToLong(t -> t.getTotalResourceStats().getMemoryInBytes()).max().orElse(0); + long currentAvg = (long) activeTasks.stream().mapToLong(t -> t.getTotalResourceStats().getMemoryInBytes()).average().orElse(0); + return new Stats(getSearchTaskCancellationCount(), currentMax, currentAvg, (long) movingAverageReference.get().getAverage()); + } + + @Override + public TaskResourceUsageTracker.Stats searchShardTaskStats(List activeTasks) { long currentMax = activeTasks.stream().mapToLong(t -> t.getTotalResourceStats().getMemoryInBytes()).max().orElse(0); long currentAvg = (long) activeTasks.stream().mapToLong(t -> t.getTotalResourceStats().getMemoryInBytes()).average().orElse(0); - return new Stats(getCancellations(), currentMax, currentAvg, (long) movingAverageReference.get().getAverage()); + return new Stats(getSearchShardTaskCancellationCount(), currentMax, currentAvg, (long) movingAverageReference.get().getAverage()); } /** diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTracker.java index cbbb751b996be..a08ca34cd37bc 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTracker.java +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTracker.java @@ -26,14 +26,23 @@ public abstract class TaskResourceUsageTracker { /** * Counts the number of cancellations made due to this tracker. */ - private final AtomicLong cancellations = new AtomicLong(); + private final AtomicLong searchTaskCancellationCount = new AtomicLong(); + private final AtomicLong searchShardTaskCancellationCount = new AtomicLong(); - public long incrementCancellations() { - return cancellations.incrementAndGet(); + public long incrementSearchTaskCancellations() { + return searchTaskCancellationCount.incrementAndGet(); } - public long getCancellations() { - return cancellations.get(); + public long incrementSearchShardTaskCancellations() { + return searchShardTaskCancellationCount.incrementAndGet(); + } + + public long getSearchTaskCancellationCount() { + return searchTaskCancellationCount.get(); + } + + public long getSearchShardTaskCancellationCount() { + return searchShardTaskCancellationCount.get(); } /** @@ -52,9 +61,14 @@ public void update(Task task) {} public abstract Optional checkAndMaybeGetCancellationReason(Task task); /** - * Returns the tracker's state as seen in the stats API. + * Returns the tracker's state for SearchTasks as seen in the stats API. + */ + public abstract Stats searchTaskStats(List activeTasks); + + /** + * Returns the tracker's state for SearchShardTasks as seen in the stats API. */ - public abstract Stats stats(List activeTasks); + public abstract Stats searchShardTaskStats(List activeTasks); /** * Represents the tracker's state as seen in the stats API. diff --git a/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java b/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java index 07a962c6824ca..1285131bf5da8 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java @@ -16,6 +16,7 @@ import org.opensearch.search.backpressure.settings.SearchBackpressureMode; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; +import org.opensearch.search.backpressure.stats.SearchTaskStats; import org.opensearch.search.backpressure.trackers.NodeDuressTracker; import org.opensearch.common.xcontent.XContentBuilder; import org.opensearch.search.backpressure.stats.SearchBackpressureStats; @@ -120,7 +121,7 @@ public void testTrackerStateUpdateOnTaskCompletion() { for (int i = 0; i < 100; i++) { service.onTaskCompleted(createMockTaskWithResourceStats(SearchShardTask.class, 100, 200)); } - assertEquals(100, service.getState().getCompletionCount()); + assertEquals(100, service.getSearchShardTasksState().getCompletionCount()); verify(mockTaskResourceUsageTracker, times(100)).update(any()); } @@ -150,8 +151,13 @@ public Optional checkAndMaybeGetCancellationReason(Task } @Override - public Stats stats(List activeTasks) { - return new MockStats(getCancellations()); + public Stats searchTaskStats(List activeTasks) { + return new MockStats(getSearchTaskCancellationCount()); + } + + @Override + public Stats searchShardTaskStats(List activeTasks) { + return new MockStats(getSearchShardTaskCancellationCount()); } }; @@ -200,13 +206,13 @@ public Stats stats(List activeTasks) { // There are 15 tasks eligible for cancellation but only 10 will be cancelled (burst limit). service.doRun(); - assertEquals(10, service.getState().getCancellationCount()); - assertEquals(1, service.getState().getLimitReachedCount()); + assertEquals(10, service.getSearchShardTasksState().getCancellationCount()); + assertEquals(1, service.getSearchShardTasksState().getLimitReachedCount()); // If the clock or completed task count haven't made sufficient progress, we'll continue to be rate-limited. service.doRun(); - assertEquals(10, service.getState().getCancellationCount()); - assertEquals(2, service.getState().getLimitReachedCount()); + assertEquals(10, service.getSearchShardTasksState().getCancellationCount()); + assertEquals(2, service.getSearchShardTasksState().getLimitReachedCount()); // Simulate task completion to replenish some tokens. // This will add 2 tokens (task count delta * cancellationRatio) to 'rateLimitPerTaskCompletion'. @@ -214,18 +220,19 @@ public Stats stats(List activeTasks) { service.onTaskCompleted(createMockTaskWithResourceStats(SearchShardTask.class, 100, taskHeapUsageBytes)); } service.doRun(); - assertEquals(12, service.getState().getCancellationCount()); - assertEquals(3, service.getState().getLimitReachedCount()); + assertEquals(12, service.getSearchShardTasksState().getCancellationCount()); + assertEquals(3, service.getSearchShardTasksState().getLimitReachedCount()); // Fast-forward the clock by one second to replenish some tokens. // This will add 3 tokens (time delta * rate) to 'rateLimitPerTime'. mockTime.addAndGet(TimeUnit.SECONDS.toNanos(1)); service.doRun(); - assertEquals(15, service.getState().getCancellationCount()); - assertEquals(3, service.getState().getLimitReachedCount()); // no more tasks to cancel; limit not reached + assertEquals(15, service.getSearchShardTasksState().getCancellationCount()); + assertEquals(3, service.getSearchShardTasksState().getLimitReachedCount()); // no more tasks to cancel; limit not reached // Verify search backpressure stats. SearchBackpressureStats expectedStats = new SearchBackpressureStats( + new SearchTaskStats(0, 0, Map.of(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, new MockStats(0))), new SearchShardTaskStats(15, 3, Map.of(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, new MockStats(15))), SearchBackpressureMode.ENFORCED ); diff --git a/server/src/test/java/org/opensearch/search/backpressure/stats/SearchBackpressureStatsTests.java b/server/src/test/java/org/opensearch/search/backpressure/stats/SearchBackpressureStatsTests.java index 2665a6d5e05aa..0c86cf4b11239 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/stats/SearchBackpressureStatsTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/stats/SearchBackpressureStatsTests.java @@ -25,6 +25,7 @@ protected SearchBackpressureStats createTestInstance() { public static SearchBackpressureStats randomInstance() { return new SearchBackpressureStats( + SearchTaskStatsTests.randomInstance(), SearchShardTaskStatsTests.randomInstance(), randomFrom(SearchBackpressureMode.DISABLED, SearchBackpressureMode.MONITOR_ONLY, SearchBackpressureMode.ENFORCED) ); diff --git a/server/src/test/java/org/opensearch/search/backpressure/stats/SearchTaskStatsTests.java b/server/src/test/java/org/opensearch/search/backpressure/stats/SearchTaskStatsTests.java new file mode 100644 index 0000000000000..59375c22bb932 --- /dev/null +++ b/server/src/test/java/org/opensearch/search/backpressure/stats/SearchTaskStatsTests.java @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.backpressure.stats; + +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.search.backpressure.trackers.CpuUsageTracker; +import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker; +import org.opensearch.search.backpressure.trackers.HeapUsageTracker; +import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker; +import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType; +import org.opensearch.test.AbstractWireSerializingTestCase; + +import java.util.Map; + +public class SearchTaskStatsTests extends AbstractWireSerializingTestCase { + public static SearchTaskStats randomInstance() { + Map resourceUsageTrackerStats = Map.of( + TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, + new CpuUsageTracker.Stats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()), + TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER, + new HeapUsageTracker.Stats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()), + TaskResourceUsageTrackerType.ELAPSED_TIME_TRACKER, + new ElapsedTimeTracker.Stats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()) + ); + + return new SearchTaskStats(randomNonNegativeLong(), randomNonNegativeLong(), resourceUsageTrackerStats); + } + + @Override + protected Writeable.Reader instanceReader() { + return SearchTaskStats::new; + } + + @Override + protected SearchTaskStats createTestInstance() { + return randomInstance(); + } +} diff --git a/server/src/test/java/org/opensearch/tasks/TaskCancellationTests.java b/server/src/test/java/org/opensearch/tasks/TaskCancellationTests.java index e74f89c905499..f30c15de28b90 100644 --- a/server/src/test/java/org/opensearch/tasks/TaskCancellationTests.java +++ b/server/src/test/java/org/opensearch/tasks/TaskCancellationTests.java @@ -27,16 +27,19 @@ public void testTaskCancellation() { TaskResourceUsageTracker mockTracker3 = createMockTaskResourceUsageTracker("mock_tracker_3"); List reasons = new ArrayList<>(); - List callbacks = List.of(mockTracker1::incrementCancellations, mockTracker2::incrementCancellations); + List callbacks = List.of( + mockTracker1::incrementSearchShardTaskCancellations, + mockTracker2::incrementSearchShardTaskCancellations + ); TaskCancellation taskCancellation = new TaskCancellation(mockTask, reasons, callbacks); // Task does not have any reason to be cancelled. assertEquals(0, taskCancellation.totalCancellationScore()); assertFalse(taskCancellation.isEligibleForCancellation()); taskCancellation.cancel(); - assertEquals(0, mockTracker1.getCancellations()); - assertEquals(0, mockTracker2.getCancellations()); - assertEquals(0, mockTracker3.getCancellations()); + assertEquals(0, mockTracker1.getSearchShardTaskCancellationCount()); + assertEquals(0, mockTracker2.getSearchShardTaskCancellationCount()); + assertEquals(0, mockTracker3.getSearchShardTaskCancellationCount()); // Task has one or more reasons to be cancelled. reasons.add(new TaskCancellation.Reason("limits exceeded 1", 10)); @@ -48,9 +51,9 @@ public void testTaskCancellation() { // Cancel the task and validate the cancellation reason and invocation of callbacks. taskCancellation.cancel(); assertTrue(mockTask.getReasonCancelled().contains("limits exceeded 1, limits exceeded 2, limits exceeded 3")); - assertEquals(1, mockTracker1.getCancellations()); - assertEquals(1, mockTracker2.getCancellations()); - assertEquals(0, mockTracker3.getCancellations()); + assertEquals(1, mockTracker1.getSearchShardTaskCancellationCount()); + assertEquals(1, mockTracker2.getSearchShardTaskCancellationCount()); + assertEquals(0, mockTracker3.getSearchShardTaskCancellationCount()); } private static TaskResourceUsageTracker createMockTaskResourceUsageTracker(String name) { @@ -69,7 +72,12 @@ public Optional checkAndMaybeGetCancellationReason(Task } @Override - public Stats stats(List activeTasks) { + public Stats searchTaskStats(List activeTasks) { + return null; + } + + @Override + public Stats searchShardTaskStats(List activeTasks) { return null; } };