Skip to content

Commit

Permalink
Cancellation of in-flight search requests at coordinator level
Browse files Browse the repository at this point in the history
Signed-off-by: PritLadani <[email protected]>
  • Loading branch information
PritLadani committed Dec 19, 2022
1 parent d76adf3 commit 1402ced
Show file tree
Hide file tree
Showing 14 changed files with 565 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.search.backpressure.settings.NodeDuressSettings;
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.search.backpressure.settings.SearchShardTaskSettings;
import org.opensearch.search.backpressure.settings.SearchTaskSettings;
import org.opensearch.search.backpressure.trackers.CpuUsageTracker;
import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker;
import org.opensearch.search.backpressure.trackers.HeapUsageTracker;
Expand Down Expand Up @@ -601,11 +602,17 @@ public void apply(Settings value, Settings current, Settings previous) {
NodeDuressSettings.SETTING_CPU_THRESHOLD,
NodeDuressSettings.SETTING_HEAP_THRESHOLD,
SearchShardTaskSettings.SETTING_TOTAL_HEAP_PERCENT_THRESHOLD,
HeapUsageTracker.SETTING_HEAP_PERCENT_THRESHOLD_FOR_SEARCH_QUERY,
HeapUsageTracker.SETTING_HEAP_PERCENT_THRESHOLD,
HeapUsageTracker.SETTING_HEAP_VARIANCE_THRESHOLD_FOR_SEARCH_QUERY,
HeapUsageTracker.SETTING_HEAP_VARIANCE_THRESHOLD,
HeapUsageTracker.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE_FOR_SEARCH_QUERY,
HeapUsageTracker.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE,
CpuUsageTracker.SETTING_CPU_TIME_MILLIS_THRESHOLD_FOR_SEARCH_QUERY,
CpuUsageTracker.SETTING_CPU_TIME_MILLIS_THRESHOLD,
ElapsedTimeTracker.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD
ElapsedTimeTracker.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD_FOR_SEARCH_QUERY,
ElapsedTimeTracker.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD,
SearchTaskSettings.SETTING_TOTAL_HEAP_PERCENT_THRESHOLD_FOR_SEARCH_QUERY
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.search.SearchShardTask;
import org.opensearch.action.search.SearchTask;
import org.opensearch.common.component.AbstractLifecycleComponent;
import org.opensearch.common.util.TokenBucket;
import org.opensearch.monitor.jvm.JvmStats;
Expand All @@ -20,6 +21,7 @@
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.search.backpressure.stats.SearchBackpressureStats;
import org.opensearch.search.backpressure.stats.SearchShardTaskStats;
import org.opensearch.search.backpressure.stats.SearchTaskStats;
import org.opensearch.search.backpressure.trackers.CpuUsageTracker;
import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker;
import org.opensearch.search.backpressure.trackers.HeapUsageTracker;
Expand All @@ -37,7 +39,9 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -68,9 +72,12 @@ public class SearchBackpressureService extends AbstractLifecycleComponent
private final AtomicReference<TokenBucket> taskCancellationRateLimiter = new AtomicReference<>();
private final AtomicReference<TokenBucket> taskCancellationRatioLimiter = new AtomicReference<>();

// Currently, only the state of SearchShardTask is being tracked.
// This can be generalized to Map<TaskType, SearchBackpressureState> once we start supporting cancellation of SearchTasks as well.
private final SearchBackpressureState state = new SearchBackpressureState();
private final Map<Class<? extends Task>, SearchBackpressureState> searchBackpressureStates = new HashMap<>() {
{
put(SearchTask.class, new SearchBackpressureState());
put(SearchShardTask.class, new SearchBackpressureState());
}
};

public SearchBackpressureService(
SearchBackpressureSettings settings,
Expand Down Expand Up @@ -116,10 +123,15 @@ public SearchBackpressureService(
);

this.taskCancellationRatioLimiter.set(
new TokenBucket(state::getCompletionCount, getSettings().getCancellationRatio(), getSettings().getCancellationBurst())
new TokenBucket(this::getTaskCompletionCount, getSettings().getCancellationRatio(), getSettings().getCancellationBurst())
);
}

private long getTaskCompletionCount() {
return searchBackpressureStates.get(SearchTask.class).getCompletionCount() + searchBackpressureStates.get(SearchShardTask.class)
.getCompletionCount();
}

void doRun() {
SearchBackpressureMode mode = getSettings().getMode();
if (mode == SearchBackpressureMode.DISABLED) {
Expand All @@ -130,18 +142,29 @@ void doRun() {
return;
}

// We are only targeting in-flight cancellation of SearchShardTask for now.
List<SearchShardTask> searchShardTasks = getSearchShardTasks();
List<CancellableTask> searchTasks = getSearchTasks();
List<CancellableTask> searchShardTasks = getSearchShardTasks();
List<CancellableTask> cancellableTasks = new ArrayList<>();

// Force-refresh usage stats of these tasks before making a cancellation decision.
taskResourceTrackingService.refreshResourceStats(searchTasks.toArray(new Task[0]));
taskResourceTrackingService.refreshResourceStats(searchShardTasks.toArray(new Task[0]));

// Skip cancellation if the increase in heap usage is not due to search requests.
if (isHeapUsageDominatedBySearch(searchShardTasks) == false) {
// Check if increase in heap usage is due to SearchTasks
if (isHeapUsageDominatedBySearch(searchTasks, getSettings().getSearchTaskSettings().getTotalHeapBytesThreshold())) {
cancellableTasks.addAll(searchTasks);
}

// Check if increase in heap usage is due to SearchShardTasks
if (isHeapUsageDominatedBySearch(searchShardTasks, getSettings().getSearchShardTaskSettings().getTotalHeapBytesThreshold())) {
cancellableTasks.addAll(searchShardTasks);
}

if (cancellableTasks.isEmpty()) {
return;
}

for (TaskCancellation taskCancellation : getTaskCancellations(searchShardTasks)) {
for (TaskCancellation taskCancellation : getTaskCancellations(cancellableTasks)) {
logger.debug(
"[{} mode] cancelling task [{}] due to high resource consumption [{}]",
mode.getName(),
Expand All @@ -160,7 +183,10 @@ void doRun() {
// Stop cancelling tasks if there are no tokens in either of the two token buckets.
if (rateLimitReached && ratioLimitReached) {
logger.debug("task cancellation limit reached");
state.incrementLimitReachedCount();
SearchBackpressureState searchBackpressureState = searchBackpressureStates.get(
(taskCancellation.getTask() instanceof SearchTask) ? SearchTask.class : SearchShardTask.class
);
searchBackpressureState.incrementLimitReachedCount();
break;
}

Expand All @@ -187,9 +213,8 @@ boolean isNodeInDuress() {
/**
* Returns true if the increase in heap usage is due to search requests.
*/
boolean isHeapUsageDominatedBySearch(List<SearchShardTask> searchShardTasks) {
long usage = searchShardTasks.stream().mapToLong(task -> task.getTotalResourceStats().getMemoryInBytes()).sum();
long threshold = getSettings().getSearchShardTaskSettings().getTotalHeapBytesThreshold();
boolean isHeapUsageDominatedBySearch(List<CancellableTask> cancellableTasks, long threshold) {
long usage = cancellableTasks.stream().mapToLong(task -> task.getTotalResourceStats().getMemoryInBytes()).sum();
if (usage < threshold) {
logger.debug("heap usage not dominated by search requests [{}/{}]", usage, threshold);
return false;
Expand All @@ -201,7 +226,7 @@ boolean isHeapUsageDominatedBySearch(List<SearchShardTask> searchShardTasks) {
/**
* Filters and returns the list of currently running SearchShardTasks.
*/
List<SearchShardTask> getSearchShardTasks() {
List<CancellableTask> getSearchShardTasks() {
return taskResourceTrackingService.getResourceAwareTasks()
.values()
.stream()
Expand All @@ -210,6 +235,18 @@ List<SearchShardTask> getSearchShardTasks() {
.collect(Collectors.toUnmodifiableList());
}

/**
* Filters and returns the list of currently running SearchTasks.
*/
List<CancellableTask> getSearchTasks() {
return taskResourceTrackingService.getResourceAwareTasks()
.values()
.stream()
.filter(task -> task instanceof SearchTask)
.map(task -> (SearchTask) task)
.collect(Collectors.toUnmodifiableList());
}

/**
* Returns a TaskCancellation wrapper containing the list of reasons (possibly zero), along with an overall
* cancellation score for the given task. Cancelling a task with a higher score has better chance of recovering the
Expand All @@ -222,13 +259,19 @@ TaskCancellation getTaskCancellation(CancellableTask task) {
for (TaskResourceUsageTracker tracker : taskResourceUsageTrackers) {
Optional<TaskCancellation.Reason> reason = tracker.checkAndMaybeGetCancellationReason(task);
if (reason.isPresent()) {
if (task instanceof SearchTask) {
callbacks.add(tracker::incrementSearchTaskCancellations);
} else {
callbacks.add(tracker::incrementSearchShardTaskCancellations);
}
reasons.add(reason.get());
callbacks.add(tracker::incrementCancellations);
}
}

if (task instanceof SearchShardTask) {
callbacks.add(state::incrementCancellationCount);
if (task instanceof SearchTask) {
callbacks.add(searchBackpressureStates.get(SearchTask.class)::incrementCancellationCount);
} else {
callbacks.add(searchBackpressureStates.get(SearchShardTask.class)::incrementCancellationCount);
}

return new TaskCancellation(task, reasons, callbacks);
Expand All @@ -249,8 +292,12 @@ SearchBackpressureSettings getSettings() {
return settings;
}

SearchBackpressureState getState() {
return state;
SearchBackpressureState getSearchTasksState() {
return searchBackpressureStates.get(SearchTask.class);
}

SearchBackpressureState getSearchShardTasksState() {
return searchBackpressureStates.get(SearchShardTask.class);
}

@Override
Expand All @@ -259,19 +306,22 @@ public void onTaskCompleted(Task task) {
return;
}

if (task instanceof SearchShardTask == false) {
if (task instanceof SearchTask == false && task instanceof SearchShardTask == false) {
return;
}

SearchShardTask searchShardTask = (SearchShardTask) task;
if (searchShardTask.isCancelled() == false) {
state.incrementCompletionCount();
CancellableTask cancellableTask = (CancellableTask) task;
SearchBackpressureState searchBackpressureState = searchBackpressureStates.get(
(task instanceof SearchTask) ? SearchTask.class : SearchShardTask.class
);
if (cancellableTask.isCancelled() == false) {
searchBackpressureState.incrementCompletionCount();
}

List<Exception> exceptions = new ArrayList<>();
for (TaskResourceUsageTracker tracker : taskResourceUsageTrackers) {
try {
tracker.update(searchShardTask);
tracker.update(task);
} catch (Exception e) {
exceptions.add(e);
}
Expand All @@ -282,7 +332,7 @@ public void onTaskCompleted(Task task) {
@Override
public void onCancellationRatioChanged() {
taskCancellationRatioLimiter.set(
new TokenBucket(state::getCompletionCount, getSettings().getCancellationRatio(), getSettings().getCancellationBurst())
new TokenBucket(this::getTaskCompletionCount, getSettings().getCancellationRatio(), getSettings().getCancellationBurst())
);
}

Expand Down Expand Up @@ -321,15 +371,30 @@ protected void doStop() {
protected void doClose() throws IOException {}

public SearchBackpressureStats nodeStats() {
List<SearchShardTask> searchShardTasks = getSearchShardTasks();
List<CancellableTask> searchTasks = getSearchTasks();
List<CancellableTask> searchShardTasks = getSearchShardTasks();

SearchTaskStats searchTaskStats = new SearchTaskStats(
searchBackpressureStates.get(SearchTask.class).getCancellationCount(),
searchBackpressureStates.get(SearchTask.class).getLimitReachedCount(),
taskResourceUsageTrackers.stream()
.collect(
Collectors.toUnmodifiableMap(t -> TaskResourceUsageTrackerType.fromName(t.name()), t -> t.searchTaskStats(searchTasks))
)
);

SearchShardTaskStats searchShardTaskStats = new SearchShardTaskStats(
state.getCancellationCount(),
state.getLimitReachedCount(),
searchBackpressureStates.get(SearchShardTask.class).getCancellationCount(),
searchBackpressureStates.get(SearchShardTask.class).getLimitReachedCount(),
taskResourceUsageTrackers.stream()
.collect(Collectors.toUnmodifiableMap(t -> TaskResourceUsageTrackerType.fromName(t.name()), t -> t.stats(searchShardTasks)))
.collect(
Collectors.toUnmodifiableMap(
t -> TaskResourceUsageTrackerType.fromName(t.name()),
t -> t.searchShardTaskStats(searchShardTasks)
)
)
);

return new SearchBackpressureStats(searchShardTaskStats, getSettings().getMode());
return new SearchBackpressureStats(searchTaskStats, searchShardTaskStats, getSettings().getMode());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,14 @@ public interface Listener {
private final Settings settings;
private final ClusterSettings clusterSettings;
private final NodeDuressSettings nodeDuressSettings;
private final SearchTaskSettings searchTaskSettings;
private final SearchShardTaskSettings searchShardTaskSettings;

public SearchBackpressureSettings(Settings settings, ClusterSettings clusterSettings) {
this.settings = settings;
this.clusterSettings = clusterSettings;
this.nodeDuressSettings = new NodeDuressSettings(settings, clusterSettings);
this.searchTaskSettings = new SearchTaskSettings(settings, clusterSettings);
this.searchShardTaskSettings = new SearchShardTaskSettings(settings, clusterSettings);

interval = new TimeValue(SETTING_INTERVAL_MILLIS.get(settings));
Expand Down Expand Up @@ -149,6 +151,10 @@ public NodeDuressSettings getNodeDuressSettings() {
return nodeDuressSettings;
}

public SearchTaskSettings getSearchTaskSettings() {
return searchTaskSettings;
}

public SearchShardTaskSettings getSearchShardTaskSettings() {
return searchShardTaskSettings;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.backpressure.settings;

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.monitor.jvm.JvmStats;

/**
* Defines the settings related to the cancellation of SearchTasks.
*
* @opensearch.internal
*/

public class SearchTaskSettings {
private static final long HEAP_SIZE_BYTES = JvmStats.jvmStats().getMem().getHeapMax().getBytes();

private static class Defaults {
private static final double TOTAL_HEAP_PERCENT_THRESHOLD = 0.05;
}

/**
* Defines the heap usage threshold (in percentage) for the sum of heap usages across all search tasks
* before in-flight cancellation is applied.
*/
private volatile double totalHeapPercentThreshold;
public static final Setting<Double> SETTING_TOTAL_HEAP_PERCENT_THRESHOLD_FOR_SEARCH_QUERY = Setting.doubleSetting(
"search_backpressure.search_task.total_heap_percent_threshold",
Defaults.TOTAL_HEAP_PERCENT_THRESHOLD,
0.0,
1.0,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public SearchTaskSettings(Settings settings, ClusterSettings clusterSettings) {
totalHeapPercentThreshold = SETTING_TOTAL_HEAP_PERCENT_THRESHOLD_FOR_SEARCH_QUERY.get(settings);
clusterSettings.addSettingsUpdateConsumer(
SETTING_TOTAL_HEAP_PERCENT_THRESHOLD_FOR_SEARCH_QUERY,
this::setTotalHeapPercentThreshold
);
}

public double getTotalHeapPercentThreshold() {
return totalHeapPercentThreshold;
}

public long getTotalHeapBytesThreshold() {
return (long) (HEAP_SIZE_BYTES * getTotalHeapPercentThreshold());
}

private void setTotalHeapPercentThreshold(double totalHeapPercentThreshold) {
this.totalHeapPercentThreshold = totalHeapPercentThreshold;
}
}
Loading

0 comments on commit 1402ced

Please sign in to comment.