From ef6dafe3948bf5984063927c2b14c9597d6d9df3 Mon Sep 17 00:00:00 2001 From: Kaushal Kumar Date: Tue, 10 Oct 2023 14:30:25 -0700 Subject: [PATCH] add taskCompletionCount in search_backpressure stats (#10028) * add taskCompletionCount in search_backpressure Signed-off-by: Kaushal Kumar * address comments to use final objects Signed-off-by: Kaushal Kumar * do spotless check run Signed-off-by: Kaushal Kumar * use primitive long to store completionCount Signed-off-by: Kaushal Kumar * use primitive long to store completionCount in searchTaskStats Signed-off-by: Kaushal Kumar * add pr link against the change Signed-off-by: Kaushal Kumar * add taskCompletionCount in search_backpressure Signed-off-by: Kaushal Kumar * address comments to use final objects Signed-off-by: Kaushal Kumar * do spotless check run Signed-off-by: Kaushal Kumar * use primitive long to store completionCount Signed-off-by: Kaushal Kumar * rebase with upstream main Signed-off-by: Kaushal Kumar * rebase with upstream main Signed-off-by: Kaushal Kumar --------- Signed-off-by: Kaushal Kumar --- CHANGELOG.md | 2 ++ .../SearchBackpressureService.java | 2 ++ .../stats/SearchShardTaskStats.java | 18 +++++++++++++++++- .../backpressure/stats/SearchTaskStats.java | 18 +++++++++++++++++- .../SearchBackpressureServiceTests.java | 14 ++++++++------ .../stats/SearchShardTaskStatsTests.java | 7 ++++++- .../stats/SearchTaskStatsTests.java | 2 +- 7 files changed, 53 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 05a3fa8cb785b..467e019270a47 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -54,8 +54,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Change http code on create index API with bad input raising NotXContentException from 500 to 400 ([#4773](https://github.com/opensearch-project/OpenSearch/pull/4773)) - Improve summary error message for invalid setting updates ([#4792](https://github.com/opensearch-project/OpenSearch/pull/4792)) - Return 409 Conflict HTTP status instead of 503 on failure to concurrently execute snapshots ([#8986](https://github.com/opensearch-project/OpenSearch/pull/5855)) +- Add task completion count in search backpressure stats API ([#10028](https://github.com/opensearch-project/OpenSearch/pull/10028/)) - Performance improvement for Datetime field caching ([#4558](https://github.com/opensearch-project/OpenSearch/issues/4558)) + ### Deprecated ### Removed diff --git a/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java b/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java index 4f6c2c327509d..29fbee416a9b8 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java +++ b/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java @@ -399,6 +399,7 @@ public SearchBackpressureStats nodeStats() { SearchTaskStats searchTaskStats = new SearchTaskStats( searchBackpressureStates.get(SearchTask.class).getCancellationCount(), searchBackpressureStates.get(SearchTask.class).getLimitReachedCount(), + searchBackpressureStates.get(SearchTask.class).getCompletionCount(), taskTrackers.get(SearchTask.class) .stream() .collect(Collectors.toUnmodifiableMap(t -> TaskResourceUsageTrackerType.fromName(t.name()), t -> t.stats(searchTasks))) @@ -407,6 +408,7 @@ public SearchBackpressureStats nodeStats() { SearchShardTaskStats searchShardTaskStats = new SearchShardTaskStats( searchBackpressureStates.get(SearchShardTask.class).getCancellationCount(), searchBackpressureStates.get(SearchShardTask.class).getLimitReachedCount(), + searchBackpressureStates.get(SearchShardTask.class).getCompletionCount(), taskTrackers.get(SearchShardTask.class) .stream() .collect(Collectors.toUnmodifiableMap(t -> TaskResourceUsageTrackerType.fromName(t.name()), t -> t.stats(searchShardTasks))) diff --git a/server/src/main/java/org/opensearch/search/backpressure/stats/SearchShardTaskStats.java b/server/src/main/java/org/opensearch/search/backpressure/stats/SearchShardTaskStats.java index 678c19d83fb96..ffe97d125b27a 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/stats/SearchShardTaskStats.java +++ b/server/src/main/java/org/opensearch/search/backpressure/stats/SearchShardTaskStats.java @@ -8,6 +8,7 @@ package org.opensearch.search.backpressure.stats; +import org.opensearch.Version; import org.opensearch.common.collect.MapBuilder; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -30,21 +31,29 @@ public class SearchShardTaskStats implements ToXContentObject, Writeable { private final long cancellationCount; private final long limitReachedCount; + private final long completionCount; private final Map resourceUsageTrackerStats; public SearchShardTaskStats( long cancellationCount, long limitReachedCount, + long completionCount, Map resourceUsageTrackerStats ) { this.cancellationCount = cancellationCount; this.limitReachedCount = limitReachedCount; + this.completionCount = completionCount; this.resourceUsageTrackerStats = resourceUsageTrackerStats; } public SearchShardTaskStats(StreamInput in) throws IOException { this.cancellationCount = in.readVLong(); this.limitReachedCount = in.readVLong(); + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + completionCount = in.readVLong(); + } else { + completionCount = -1; + } MapBuilder builder = new MapBuilder<>(); builder.put(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, in.readOptionalWriteable(CpuUsageTracker.Stats::new)); @@ -62,6 +71,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(entry.getKey().getName(), entry.getValue()); } builder.endObject(); + if (completionCount != -1) { + builder.field("completion_count", completionCount); + } builder.startObject("cancellation_stats") .field("cancellation_count", cancellationCount) @@ -75,6 +87,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws public void writeTo(StreamOutput out) throws IOException { out.writeVLong(cancellationCount); out.writeVLong(limitReachedCount); + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + out.writeVLong(completionCount); + } out.writeOptionalWriteable(resourceUsageTrackerStats.get(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER)); out.writeOptionalWriteable(resourceUsageTrackerStats.get(TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER)); @@ -88,11 +103,12 @@ public boolean equals(Object o) { SearchShardTaskStats that = (SearchShardTaskStats) o; return cancellationCount == that.cancellationCount && limitReachedCount == that.limitReachedCount + && completionCount == that.completionCount && resourceUsageTrackerStats.equals(that.resourceUsageTrackerStats); } @Override public int hashCode() { - return Objects.hash(cancellationCount, limitReachedCount, resourceUsageTrackerStats); + return Objects.hash(cancellationCount, limitReachedCount, resourceUsageTrackerStats, completionCount); } } diff --git a/server/src/main/java/org/opensearch/search/backpressure/stats/SearchTaskStats.java b/server/src/main/java/org/opensearch/search/backpressure/stats/SearchTaskStats.java index 302350104bd3a..a7f9b4e3d004f 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/stats/SearchTaskStats.java +++ b/server/src/main/java/org/opensearch/search/backpressure/stats/SearchTaskStats.java @@ -8,6 +8,7 @@ package org.opensearch.search.backpressure.stats; +import org.opensearch.Version; import org.opensearch.common.collect.MapBuilder; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -31,21 +32,29 @@ public class SearchTaskStats implements ToXContentObject, Writeable { private final long cancellationCount; private final long limitReachedCount; + private final long completionCount; private final Map resourceUsageTrackerStats; public SearchTaskStats( long cancellationCount, long limitReachedCount, + long completionCount, Map resourceUsageTrackerStats ) { this.cancellationCount = cancellationCount; this.limitReachedCount = limitReachedCount; + this.completionCount = completionCount; this.resourceUsageTrackerStats = resourceUsageTrackerStats; } public SearchTaskStats(StreamInput in) throws IOException { this.cancellationCount = in.readVLong(); this.limitReachedCount = in.readVLong(); + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + this.completionCount = in.readVLong(); + } else { + this.completionCount = -1; + } MapBuilder builder = new MapBuilder<>(); builder.put(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, in.readOptionalWriteable(CpuUsageTracker.Stats::new)); @@ -63,6 +72,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(entry.getKey().getName(), entry.getValue()); } builder.endObject(); + if (completionCount != -1) { + builder.field("completion_count", completionCount); + } builder.startObject("cancellation_stats") .field("cancellation_count", cancellationCount) @@ -76,6 +88,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws public void writeTo(StreamOutput out) throws IOException { out.writeVLong(cancellationCount); out.writeVLong(limitReachedCount); + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + out.writeVLong(completionCount); + } out.writeOptionalWriteable(resourceUsageTrackerStats.get(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER)); out.writeOptionalWriteable(resourceUsageTrackerStats.get(TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER)); @@ -89,11 +104,12 @@ public boolean equals(Object o) { SearchTaskStats that = (SearchTaskStats) o; return cancellationCount == that.cancellationCount && limitReachedCount == that.limitReachedCount + && completionCount == that.completionCount && resourceUsageTrackerStats.equals(that.resourceUsageTrackerStats); } @Override public int hashCode() { - return Objects.hash(cancellationCount, limitReachedCount, resourceUsageTrackerStats); + return Objects.hash(cancellationCount, limitReachedCount, resourceUsageTrackerStats, completionCount); } } diff --git a/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java b/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java index f0d930c4c3acb..9778798b706f4 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java @@ -249,10 +249,11 @@ public void testSearchTaskInFlightCancellation() { verify(mockTaskManager, times(10)).cancelTaskAndDescendants(any(), anyString(), anyBoolean(), any()); assertEquals(3, service.getSearchBackpressureState(SearchTask.class).getLimitReachedCount()); - // Verify search backpressure stats. + // Verify search backpressure stats. Since we are not marking any task as completed the completionCount will be 0 + // for SearchTaskStats here. SearchBackpressureStats expectedStats = new SearchBackpressureStats( - new SearchTaskStats(10, 3, Map.of(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, new MockStats(10))), - new SearchShardTaskStats(0, 0, Collections.emptyMap()), + new SearchTaskStats(10, 3, 0, Map.of(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, new MockStats(10))), + new SearchShardTaskStats(0, 0, 0, Collections.emptyMap()), SearchBackpressureMode.ENFORCED ); SearchBackpressureStats actualStats = service.nodeStats(); @@ -323,10 +324,11 @@ public void testSearchShardTaskInFlightCancellation() { verify(mockTaskManager, times(12)).cancelTaskAndDescendants(any(), anyString(), anyBoolean(), any()); assertEquals(3, service.getSearchBackpressureState(SearchShardTask.class).getLimitReachedCount()); - // Verify search backpressure stats. + // Verify search backpressure stats. We are marking 20 SearchShardTasks as completed this should get + // reflected in SearchShardTaskStats. SearchBackpressureStats expectedStats = new SearchBackpressureStats( - new SearchTaskStats(0, 0, Collections.emptyMap()), - new SearchShardTaskStats(12, 3, Map.of(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, new MockStats(12))), + new SearchTaskStats(0, 0, 0, Collections.emptyMap()), + new SearchShardTaskStats(12, 3, 20, Map.of(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, new MockStats(12))), SearchBackpressureMode.ENFORCED ); SearchBackpressureStats actualStats = service.nodeStats(); diff --git a/server/src/test/java/org/opensearch/search/backpressure/stats/SearchShardTaskStatsTests.java b/server/src/test/java/org/opensearch/search/backpressure/stats/SearchShardTaskStatsTests.java index 6478fdfff61d4..f28b82cad30d3 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/stats/SearchShardTaskStatsTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/stats/SearchShardTaskStatsTests.java @@ -39,6 +39,11 @@ public static SearchShardTaskStats randomInstance() { new ElapsedTimeTracker.Stats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()) ); - return new SearchShardTaskStats(randomNonNegativeLong(), randomNonNegativeLong(), resourceUsageTrackerStats); + return new SearchShardTaskStats( + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + resourceUsageTrackerStats + ); } } diff --git a/server/src/test/java/org/opensearch/search/backpressure/stats/SearchTaskStatsTests.java b/server/src/test/java/org/opensearch/search/backpressure/stats/SearchTaskStatsTests.java index eb33bc1c37b7e..cc7aa92826b41 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/stats/SearchTaskStatsTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/stats/SearchTaskStatsTests.java @@ -40,6 +40,6 @@ public static SearchTaskStats randomInstance() { new ElapsedTimeTracker.Stats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()) ); - return new SearchTaskStats(randomNonNegativeLong(), randomNonNegativeLong(), resourceUsageTrackerStats); + return new SearchTaskStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), resourceUsageTrackerStats); } }