Skip to content

Commit

Permalink
Task cancellation monitoring service (opensearch-project#7642)
Browse files Browse the repository at this point in the history
* Task cancellation monitoring service

Signed-off-by: Sagar Upadhyaya <[email protected]>
Signed-off-by: Rishab Nahata <[email protected]>
  • Loading branch information
sgup432 authored and imRishN committed Jun 27, 2023
1 parent 5424913 commit c43cdfe
Show file tree
Hide file tree
Showing 22 changed files with 1,016 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- Add task cancellation monitoring service ([#7642](https://github.com/opensearch-project/OpenSearch/pull/7642))
- Add TokenManager Interface ([#7452](https://github.com/opensearch-project/OpenSearch/pull/7452))
- Add Remote store as a segment replication source ([#7653](https://github.com/opensearch-project/OpenSearch/pull/7653))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.opensearch.script.ScriptCacheStats;
import org.opensearch.script.ScriptStats;
import org.opensearch.search.backpressure.stats.SearchBackpressureStats;
import org.opensearch.tasks.TaskCancellationStats;
import org.opensearch.threadpool.ThreadPoolStats;
import org.opensearch.transport.TransportStats;

Expand Down Expand Up @@ -134,6 +135,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
@Nullable
private FileCacheStats fileCacheStats;

@Nullable
private TaskCancellationStats taskCancellationStats;

public NodeStats(StreamInput in) throws IOException {
super(in);
timestamp = in.readVLong();
Expand Down Expand Up @@ -180,6 +184,11 @@ public NodeStats(StreamInput in) throws IOException {
} else {
fileCacheStats = null;
}
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
taskCancellationStats = in.readOptionalWriteable(TaskCancellationStats::new);
} else {
taskCancellationStats = null;
}
}

public NodeStats(
Expand All @@ -204,7 +213,8 @@ public NodeStats(
@Nullable SearchBackpressureStats searchBackpressureStats,
@Nullable ClusterManagerThrottlingStats clusterManagerThrottlingStats,
@Nullable WeightedRoutingStats weightedRoutingStats,
@Nullable FileCacheStats fileCacheStats
@Nullable FileCacheStats fileCacheStats,
@Nullable TaskCancellationStats taskCancellationStats
) {
super(node);
this.timestamp = timestamp;
Expand All @@ -228,6 +238,7 @@ public NodeStats(
this.clusterManagerThrottlingStats = clusterManagerThrottlingStats;
this.weightedRoutingStats = weightedRoutingStats;
this.fileCacheStats = fileCacheStats;
this.taskCancellationStats = taskCancellationStats;
}

public long getTimestamp() {
Expand Down Expand Up @@ -355,6 +366,11 @@ public FileCacheStats getFileCacheStats() {
return fileCacheStats;
}

@Nullable
public TaskCancellationStats getTaskCancellationStats() {
return taskCancellationStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down Expand Up @@ -392,6 +408,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_7_0)) {
out.writeOptionalWriteable(fileCacheStats);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalWriteable(taskCancellationStats);
}
}

@Override
Expand Down Expand Up @@ -476,6 +495,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getFileCacheStats() != null) {
getFileCacheStats().toXContent(builder, params);
}
if (getTaskCancellationStats() != null) {
getTaskCancellationStats().toXContent(builder, params);
}

return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ public enum Metric {
SEARCH_BACKPRESSURE("search_backpressure"),
CLUSTER_MANAGER_THROTTLING("cluster_manager_throttling"),
WEIGHTED_ROUTING_STATS("weighted_routing"),
FILE_CACHE_STATS("file_cache");
FILE_CACHE_STATS("file_cache"),
TASK_CANCELLATION("task_cancellation");

private String metricName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.SEARCH_BACKPRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.CLUSTER_MANAGER_THROTTLING.containedIn(metrics),
NodesStatsRequest.Metric.WEIGHTED_ROUTING_STATS.containedIn(metrics),
NodesStatsRequest.Metric.FILE_CACHE_STATS.containedIn(metrics)
NodesStatsRequest.Metric.FILE_CACHE_STATS.containedIn(metrics),
NodesStatsRequest.Metric.TASK_CANCELLATION.containedIn(metrics)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
false,
false,
false,
false,
false
);
List<ShardStats> shardsStats = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.search.backpressure.settings.SearchShardTaskSettings;
import org.opensearch.search.backpressure.settings.SearchTaskSettings;
import org.opensearch.tasks.TaskCancellationMonitoringSettings;
import org.opensearch.tasks.TaskManager;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.tasks.consumer.TopNSearchTasksLogger;
Expand Down Expand Up @@ -649,7 +650,11 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT,
RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE,
RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE,
RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE
RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE,

// Related to monitoring of task cancellation
TaskCancellationMonitoringSettings.IS_ENABLED_SETTING,
TaskCancellationMonitoringSettings.DURATION_MILLIS_SETTING
)
)
);
Expand Down
17 changes: 16 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
import org.opensearch.search.backpressure.SearchBackpressureService;
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.search.pipeline.SearchPipelineService;
import org.opensearch.tasks.TaskCancellationMonitoringService;
import org.opensearch.tasks.TaskCancellationMonitoringSettings;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.tasks.consumer.TopNSearchTasksLogger;
import org.opensearch.threadpool.RunnableTaskExecutionListener;
Expand Down Expand Up @@ -972,6 +974,15 @@ protected Node(
client,
FeatureFlags.isEnabled(SEARCH_PIPELINE)
);
final TaskCancellationMonitoringSettings taskCancellationMonitoringSettings = new TaskCancellationMonitoringSettings(
settings,
clusterService.getClusterSettings()
);
final TaskCancellationMonitoringService taskCancellationMonitoringService = new TaskCancellationMonitoringService(
threadPool,
transportService.getTaskManager(),
taskCancellationMonitoringSettings
);
this.nodeService = new NodeService(
settings,
threadPool,
Expand All @@ -992,7 +1003,8 @@ protected Node(
searchModule.getValuesSourceRegistry().getUsageService(),
searchBackpressureService,
searchPipelineService,
fileCache
fileCache,
taskCancellationMonitoringService
);

final SearchService searchService = newSearchService(
Expand Down Expand Up @@ -1222,6 +1234,7 @@ public Node start() throws NodeValidationException {
injector.getInstance(FsHealthService.class).start();
nodeService.getMonitorService().start();
nodeService.getSearchBackpressureService().start();
nodeService.getTaskCancellationMonitoringService().start();

final ClusterService clusterService = injector.getInstance(ClusterService.class);

Expand Down Expand Up @@ -1380,6 +1393,7 @@ private Node stop() {
injector.getInstance(GatewayService.class).stop();
injector.getInstance(SearchService.class).stop();
injector.getInstance(TransportService.class).stop();
nodeService.getTaskCancellationMonitoringService().stop();

pluginLifecycleComponents.forEach(LifecycleComponent::stop);
// we should stop this last since it waits for resources to get released
Expand Down Expand Up @@ -1443,6 +1457,7 @@ public synchronized void close() throws IOException {
toClose.add(injector.getInstance(SearchService.class));
toClose.add(() -> stopWatch.stop().start("transport"));
toClose.add(injector.getInstance(TransportService.class));
toClose.add(nodeService.getTaskCancellationMonitoringService());

for (LifecycleComponent plugin : pluginLifecycleComponents) {
toClose.add(() -> stopWatch.stop().start("plugin(" + plugin.getClass().getName() + ")"));
Expand Down
16 changes: 13 additions & 3 deletions server/src/main/java/org/opensearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.opensearch.search.aggregations.support.AggregationUsageService;
import org.opensearch.search.backpressure.SearchBackpressureService;
import org.opensearch.search.pipeline.SearchPipelineService;
import org.opensearch.tasks.TaskCancellationMonitoringService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand Down Expand Up @@ -90,6 +91,7 @@ public class NodeService implements Closeable {
private final ClusterService clusterService;
private final Discovery discovery;
private final FileCache fileCache;
private final TaskCancellationMonitoringService taskCancellationMonitoringService;

NodeService(
Settings settings,
Expand All @@ -111,7 +113,8 @@ public class NodeService implements Closeable {
AggregationUsageService aggregationUsageService,
SearchBackpressureService searchBackpressureService,
SearchPipelineService searchPipelineService,
FileCache fileCache
FileCache fileCache,
TaskCancellationMonitoringService taskCancellationMonitoringService
) {
this.settings = settings;
this.threadPool = threadPool;
Expand All @@ -133,6 +136,7 @@ public class NodeService implements Closeable {
this.searchPipelineService = searchPipelineService;
this.clusterService = clusterService;
this.fileCache = fileCache;
this.taskCancellationMonitoringService = taskCancellationMonitoringService;
clusterService.addStateApplier(ingestService);
clusterService.addStateApplier(searchPipelineService);
}
Expand Down Expand Up @@ -211,7 +215,8 @@ public NodeStats stats(
boolean searchBackpressure,
boolean clusterManagerThrottling,
boolean weightedRoutingStats,
boolean fileCacheStats
boolean fileCacheStats,
boolean taskCancellation
) {
// for indices stats we want to include previous allocated shards stats as well (it will
// only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
Expand All @@ -237,7 +242,8 @@ public NodeStats stats(
searchBackpressure ? this.searchBackpressureService.nodeStats() : null,
clusterManagerThrottling ? this.clusterService.getClusterManagerService().getThrottlingStats() : null,
weightedRoutingStats ? WeightedRoutingStats.getInstance() : null,
fileCacheStats && fileCache != null ? fileCache.fileCacheStats() : null
fileCacheStats && fileCache != null ? fileCache.fileCacheStats() : null,
taskCancellation ? this.taskCancellationMonitoringService.stats() : null
);
}

Expand All @@ -253,6 +259,10 @@ public SearchBackpressureService getSearchBackpressureService() {
return searchBackpressureService;
}

public TaskCancellationMonitoringService getTaskCancellationMonitoringService() {
return taskCancellationMonitoringService;
}

@Override
public void close() throws IOException {
IOUtils.close(indicesService);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.tasks;

import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Objects;

/**
* Holds monitoring service stats specific to search shard task.
*/
public class SearchShardTaskCancellationStats implements ToXContentObject, Writeable {

private final long currentLongRunningCancelledTaskCount;
private final long totalLongRunningCancelledTaskCount;

public SearchShardTaskCancellationStats(long currentTaskCount, long totalTaskCount) {
this.currentLongRunningCancelledTaskCount = currentTaskCount;
this.totalLongRunningCancelledTaskCount = totalTaskCount;
}

public SearchShardTaskCancellationStats(StreamInput in) throws IOException {
this.currentLongRunningCancelledTaskCount = in.readVLong();
this.totalLongRunningCancelledTaskCount = in.readVLong();
}

// package private for testing
protected long getCurrentLongRunningCancelledTaskCount() {
return this.currentLongRunningCancelledTaskCount;
}

// package private for testing
protected long getTotalLongRunningCancelledTaskCount() {
return this.totalLongRunningCancelledTaskCount;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("current_count_post_cancel", currentLongRunningCancelledTaskCount);
builder.field("total_count_post_cancel", totalLongRunningCancelledTaskCount);
return builder.endObject();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(currentLongRunningCancelledTaskCount);
out.writeVLong(totalLongRunningCancelledTaskCount);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SearchShardTaskCancellationStats that = (SearchShardTaskCancellationStats) o;
return currentLongRunningCancelledTaskCount == that.currentLongRunningCancelledTaskCount
&& totalLongRunningCancelledTaskCount == that.totalLongRunningCancelledTaskCount;
}

@Override
public int hashCode() {
return Objects.hash(currentLongRunningCancelledTaskCount, totalLongRunningCancelledTaskCount);
}
}
Loading

0 comments on commit c43cdfe

Please sign in to comment.