From 2e9a0212e73d790652bef2dbf23e14858c5d68f3 Mon Sep 17 00:00:00 2001 From: Bhumika Saini Date: Tue, 10 Oct 2023 11:34:54 +0530 Subject: [PATCH 01/13] WIP Signed-off-by: Bhumika Saini --- .../admin/cluster/node/stats/NodeStats.java | 40 +++++++++++++++- .../SegmentReplicationPressureService.java | 4 ++ .../SegmentReplicationPressureStats.java | 48 +++++++++++++++++++ .../remote/RemoteStorePressureService.java | 6 +++ .../remote/RemoteStorePressureStats.java | 48 +++++++++++++++++++ .../java/org/opensearch/node/NodeService.java | 8 +++- 6 files changed, 151 insertions(+), 3 deletions(-) create mode 100644 server/src/main/java/org/opensearch/index/SegmentReplicationPressureStats.java create mode 100644 server/src/main/java/org/opensearch/index/remote/RemoteStorePressureStats.java diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java index dd36b3b8db3ab..bd6a3b389b358 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java @@ -46,6 +46,8 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.discovery.DiscoveryStats; import org.opensearch.http.HttpStats; +import org.opensearch.index.SegmentReplicationPressureStats; +import org.opensearch.index.remote.RemoteStorePressureStats; import org.opensearch.index.stats.IndexingPressureStats; import org.opensearch.index.stats.ShardIndexingPressureStats; import org.opensearch.index.store.remote.filecache.FileCacheStats; @@ -142,6 +144,12 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { @Nullable private SearchPipelineStats searchPipelineStats; + @Nullable + private RemoteStorePressureStats remoteStorePressureStats; + + @Nullable + private SegmentReplicationPressureStats segmentReplicationPressureStats; + public NodeStats(StreamInput in) throws IOException { super(in); timestamp = in.readVLong(); @@ -198,6 +206,11 @@ public NodeStats(StreamInput in) throws IOException { } else { searchPipelineStats = null; } + // TODO: change to V_2_11_0 on main after backport to 2.x + if (in.getVersion().onOrAfter(Version.CURRENT)) { + remoteStorePressureStats = in.readOptionalWriteable(RemoteStorePressureStats::new); + segmentReplicationPressureStats = in.readOptionalWriteable(SegmentReplicationPressureStats::new); + } } public NodeStats( @@ -224,7 +237,9 @@ public NodeStats( @Nullable WeightedRoutingStats weightedRoutingStats, @Nullable FileCacheStats fileCacheStats, @Nullable TaskCancellationStats taskCancellationStats, - @Nullable SearchPipelineStats searchPipelineStats + @Nullable SearchPipelineStats searchPipelineStats, + @Nullable RemoteStorePressureStats remoteStorePressureStats, + @Nullable SegmentReplicationPressureStats segmentReplicationPressureStats ) { super(node); this.timestamp = timestamp; @@ -250,6 +265,8 @@ public NodeStats( this.fileCacheStats = fileCacheStats; this.taskCancellationStats = taskCancellationStats; this.searchPipelineStats = searchPipelineStats; + this.remoteStorePressureStats = remoteStorePressureStats; + this.segmentReplicationPressureStats = segmentReplicationPressureStats; } public long getTimestamp() { @@ -387,6 +404,16 @@ public SearchPipelineStats getSearchPipelineStats() { return searchPipelineStats; } + @Nullable + public RemoteStorePressureStats getRemoteStorePressureStats() { + return remoteStorePressureStats; + } + + @Nullable + public SegmentReplicationPressureStats getSegmentReplicationPressureStats() { + return segmentReplicationPressureStats; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -430,6 +457,11 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_9_0)) { out.writeOptionalWriteable(searchPipelineStats); } + // TODO: change to V_2_11_0 on main after backport to 2.x + if (out.getVersion().onOrAfter(Version.CURRENT)) { + out.writeOptionalWriteable(remoteStorePressureStats); + out.writeOptionalWriteable(segmentReplicationPressureStats); + } } @Override @@ -520,6 +552,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (getSearchPipelineStats() != null) { getSearchPipelineStats().toXContent(builder, params); } + if (getRemoteStorePressureStats() != null) { + getRemoteStorePressureStats().toXContent(builder, params); + } + if (getSegmentReplicationPressureStats() != null) { + getSegmentReplicationPressureStats().toXContent(builder, params); + } return builder; } diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java index 4284daf9ffef4..8b06caa2e928b 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java @@ -179,6 +179,10 @@ public SegmentReplicationStats nodeStats() { return tracker.getStats(); } + public SegmentReplicationPressureStats pressureStats() { + return new SegmentReplicationPressureStats(tracker.getStatsForShard()); + } + public SegmentReplicationPerGroupStats getStatsForShard(IndexShard indexShard) { return tracker.getStatsForShard(indexShard); } diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureStats.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureStats.java new file mode 100644 index 0000000000000..7ec8720b41a4b --- /dev/null +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureStats.java @@ -0,0 +1,48 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; + +/** + * Stats for Segment Replication Backpressure + * + * @opensearch.internal + */ +public class SegmentReplicationPressureStats implements Writeable, ToXContentFragment { + private final long totalRejections; + + public SegmentReplicationPressureStats(long totalRejections) { + this.totalRejections = totalRejections; + } + + public SegmentReplicationPressureStats(StreamInput in) throws IOException { + totalRejections = in.readVLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(totalRejections); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject("segment_replication_pressure"); + builder.field("total_rejections", totalRejections); + builder.endObject(); // segment_replication_pressure + + return builder; + } +} diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureService.java b/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureService.java index 2920b33921869..7a1b344696bbe 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureService.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureService.java @@ -83,6 +83,12 @@ public void validateSegmentsUploadLag(ShardId shardId) { } } + RemoteStorePressureStats pressureStats() { + return new RemoteStorePressureStats( + remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId).getRejectionCount() + ); + } + /** * Abstract class for validating if lag is acceptable or not. * diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureStats.java b/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureStats.java new file mode 100644 index 0000000000000..eb5e77a04457b --- /dev/null +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureStats.java @@ -0,0 +1,48 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.remote; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; + +/** + * Stats for Remote Store Backpressure + * + * @opensearch.internal + */ +public class RemoteStorePressureStats implements Writeable, ToXContentFragment { + private final long totalRejections; + + public RemoteStorePressureStats(long totalRejections) { + this.totalRejections = totalRejections; + } + + public RemoteStorePressureStats(StreamInput in) throws IOException { + totalRejections = in.readVLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(totalRejections); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject("remote_store_pressure"); + builder.field("total_rejections", totalRejections); + builder.endObject(); // remote_store_pressure + + return builder; + } +} diff --git a/server/src/main/java/org/opensearch/node/NodeService.java b/server/src/main/java/org/opensearch/node/NodeService.java index 2688b894cb9a7..e3572c08a1fb1 100644 --- a/server/src/main/java/org/opensearch/node/NodeService.java +++ b/server/src/main/java/org/opensearch/node/NodeService.java @@ -217,7 +217,9 @@ public NodeStats stats( boolean weightedRoutingStats, boolean fileCacheStats, boolean taskCancellation, - boolean searchPipelineStats + boolean searchPipelineStats, + boolean remoteStorePressureStats, + boolean segmentReplicationPressureStats ) { // for indices stats we want to include previous allocated shards stats as well (it will // only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats) @@ -245,7 +247,9 @@ public NodeStats stats( weightedRoutingStats ? WeightedRoutingStats.getInstance() : null, fileCacheStats && fileCache != null ? fileCache.fileCacheStats() : null, taskCancellation ? this.taskCancellationMonitoringService.stats() : null, - searchPipelineStats ? this.searchPipelineService.stats() : null + searchPipelineStats ? this.searchPipelineService.stats() : null, + remoteStorePressureStats ? . : null, + segmentReplicationPressureStats ? . : null ); } From 54af7f70ea40f8fba3a484fb67219c0751242825 Mon Sep 17 00:00:00 2001 From: Bhumika Saini Date: Tue, 10 Oct 2023 11:43:05 +0530 Subject: [PATCH 02/13] Update CHANGELOG Signed-off-by: Bhumika Saini --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a9e5bb3982708..3e6419c3d0dc6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -88,6 +88,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Dependencies ### Changed +- [Remote Store] Add Remote Store backpressure rejection stats to `_nodes/stats` ([#10524](https://github.com/opensearch-project/OpenSearch/pull/10524)) ### Deprecated From 39f54e4ec58239175fbbf1eebba85d5db592044a Mon Sep 17 00:00:00 2001 From: Bhumika Saini Date: Tue, 10 Oct 2023 16:20:33 +0530 Subject: [PATCH 03/13] Revert "WIP" This reverts commit 669a75f60b337d3e7b3d168808b0eb577a62507b. Signed-off-by: Bhumika Saini --- .../admin/cluster/node/stats/NodeStats.java | 40 +--------------- .../SegmentReplicationPressureService.java | 4 -- .../SegmentReplicationPressureStats.java | 48 ------------------- .../remote/RemoteStorePressureService.java | 6 --- .../remote/RemoteStorePressureStats.java | 48 ------------------- .../java/org/opensearch/node/NodeService.java | 8 +--- 6 files changed, 3 insertions(+), 151 deletions(-) delete mode 100644 server/src/main/java/org/opensearch/index/SegmentReplicationPressureStats.java delete mode 100644 server/src/main/java/org/opensearch/index/remote/RemoteStorePressureStats.java diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java index bd6a3b389b358..dd36b3b8db3ab 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java @@ -46,8 +46,6 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.discovery.DiscoveryStats; import org.opensearch.http.HttpStats; -import org.opensearch.index.SegmentReplicationPressureStats; -import org.opensearch.index.remote.RemoteStorePressureStats; import org.opensearch.index.stats.IndexingPressureStats; import org.opensearch.index.stats.ShardIndexingPressureStats; import org.opensearch.index.store.remote.filecache.FileCacheStats; @@ -144,12 +142,6 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { @Nullable private SearchPipelineStats searchPipelineStats; - @Nullable - private RemoteStorePressureStats remoteStorePressureStats; - - @Nullable - private SegmentReplicationPressureStats segmentReplicationPressureStats; - public NodeStats(StreamInput in) throws IOException { super(in); timestamp = in.readVLong(); @@ -206,11 +198,6 @@ public NodeStats(StreamInput in) throws IOException { } else { searchPipelineStats = null; } - // TODO: change to V_2_11_0 on main after backport to 2.x - if (in.getVersion().onOrAfter(Version.CURRENT)) { - remoteStorePressureStats = in.readOptionalWriteable(RemoteStorePressureStats::new); - segmentReplicationPressureStats = in.readOptionalWriteable(SegmentReplicationPressureStats::new); - } } public NodeStats( @@ -237,9 +224,7 @@ public NodeStats( @Nullable WeightedRoutingStats weightedRoutingStats, @Nullable FileCacheStats fileCacheStats, @Nullable TaskCancellationStats taskCancellationStats, - @Nullable SearchPipelineStats searchPipelineStats, - @Nullable RemoteStorePressureStats remoteStorePressureStats, - @Nullable SegmentReplicationPressureStats segmentReplicationPressureStats + @Nullable SearchPipelineStats searchPipelineStats ) { super(node); this.timestamp = timestamp; @@ -265,8 +250,6 @@ public NodeStats( this.fileCacheStats = fileCacheStats; this.taskCancellationStats = taskCancellationStats; this.searchPipelineStats = searchPipelineStats; - this.remoteStorePressureStats = remoteStorePressureStats; - this.segmentReplicationPressureStats = segmentReplicationPressureStats; } public long getTimestamp() { @@ -404,16 +387,6 @@ public SearchPipelineStats getSearchPipelineStats() { return searchPipelineStats; } - @Nullable - public RemoteStorePressureStats getRemoteStorePressureStats() { - return remoteStorePressureStats; - } - - @Nullable - public SegmentReplicationPressureStats getSegmentReplicationPressureStats() { - return segmentReplicationPressureStats; - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -457,11 +430,6 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_9_0)) { out.writeOptionalWriteable(searchPipelineStats); } - // TODO: change to V_2_11_0 on main after backport to 2.x - if (out.getVersion().onOrAfter(Version.CURRENT)) { - out.writeOptionalWriteable(remoteStorePressureStats); - out.writeOptionalWriteable(segmentReplicationPressureStats); - } } @Override @@ -552,12 +520,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (getSearchPipelineStats() != null) { getSearchPipelineStats().toXContent(builder, params); } - if (getRemoteStorePressureStats() != null) { - getRemoteStorePressureStats().toXContent(builder, params); - } - if (getSegmentReplicationPressureStats() != null) { - getSegmentReplicationPressureStats().toXContent(builder, params); - } return builder; } diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java index 8b06caa2e928b..4284daf9ffef4 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java @@ -179,10 +179,6 @@ public SegmentReplicationStats nodeStats() { return tracker.getStats(); } - public SegmentReplicationPressureStats pressureStats() { - return new SegmentReplicationPressureStats(tracker.getStatsForShard()); - } - public SegmentReplicationPerGroupStats getStatsForShard(IndexShard indexShard) { return tracker.getStatsForShard(indexShard); } diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureStats.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureStats.java deleted file mode 100644 index 7ec8720b41a4b..0000000000000 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureStats.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.index; - -import org.opensearch.core.common.io.stream.StreamInput; -import org.opensearch.core.common.io.stream.StreamOutput; -import org.opensearch.core.common.io.stream.Writeable; -import org.opensearch.core.xcontent.ToXContentFragment; -import org.opensearch.core.xcontent.XContentBuilder; - -import java.io.IOException; - -/** - * Stats for Segment Replication Backpressure - * - * @opensearch.internal - */ -public class SegmentReplicationPressureStats implements Writeable, ToXContentFragment { - private final long totalRejections; - - public SegmentReplicationPressureStats(long totalRejections) { - this.totalRejections = totalRejections; - } - - public SegmentReplicationPressureStats(StreamInput in) throws IOException { - totalRejections = in.readVLong(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeVLong(totalRejections); - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject("segment_replication_pressure"); - builder.field("total_rejections", totalRejections); - builder.endObject(); // segment_replication_pressure - - return builder; - } -} diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureService.java b/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureService.java index 7a1b344696bbe..2920b33921869 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureService.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureService.java @@ -83,12 +83,6 @@ public void validateSegmentsUploadLag(ShardId shardId) { } } - RemoteStorePressureStats pressureStats() { - return new RemoteStorePressureStats( - remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId).getRejectionCount() - ); - } - /** * Abstract class for validating if lag is acceptable or not. * diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureStats.java b/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureStats.java deleted file mode 100644 index eb5e77a04457b..0000000000000 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureStats.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.index.remote; - -import org.opensearch.core.common.io.stream.StreamInput; -import org.opensearch.core.common.io.stream.StreamOutput; -import org.opensearch.core.common.io.stream.Writeable; -import org.opensearch.core.xcontent.ToXContentFragment; -import org.opensearch.core.xcontent.XContentBuilder; - -import java.io.IOException; - -/** - * Stats for Remote Store Backpressure - * - * @opensearch.internal - */ -public class RemoteStorePressureStats implements Writeable, ToXContentFragment { - private final long totalRejections; - - public RemoteStorePressureStats(long totalRejections) { - this.totalRejections = totalRejections; - } - - public RemoteStorePressureStats(StreamInput in) throws IOException { - totalRejections = in.readVLong(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeVLong(totalRejections); - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject("remote_store_pressure"); - builder.field("total_rejections", totalRejections); - builder.endObject(); // remote_store_pressure - - return builder; - } -} diff --git a/server/src/main/java/org/opensearch/node/NodeService.java b/server/src/main/java/org/opensearch/node/NodeService.java index e3572c08a1fb1..2688b894cb9a7 100644 --- a/server/src/main/java/org/opensearch/node/NodeService.java +++ b/server/src/main/java/org/opensearch/node/NodeService.java @@ -217,9 +217,7 @@ public NodeStats stats( boolean weightedRoutingStats, boolean fileCacheStats, boolean taskCancellation, - boolean searchPipelineStats, - boolean remoteStorePressureStats, - boolean segmentReplicationPressureStats + boolean searchPipelineStats ) { // for indices stats we want to include previous allocated shards stats as well (it will // only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats) @@ -247,9 +245,7 @@ public NodeStats stats( weightedRoutingStats ? WeightedRoutingStats.getInstance() : null, fileCacheStats && fileCache != null ? fileCache.fileCacheStats() : null, taskCancellation ? this.taskCancellationMonitoringService.stats() : null, - searchPipelineStats ? this.searchPipelineService.stats() : null, - remoteStorePressureStats ? . : null, - segmentReplicationPressureStats ? . : null + searchPipelineStats ? this.searchPipelineService.stats() : null ); } From a26b2741e6d689b29ffe231a3f7ab6b4d5a424ab Mon Sep 17 00:00:00 2001 From: Bhumika Saini Date: Tue, 10 Oct 2023 17:31:29 +0530 Subject: [PATCH 04/13] Add total_rejections to remote store upload stats Signed-off-by: Bhumika Saini --- .../index/remote/RemoteSegmentStats.java | 51 ++++++++++++++++--- 1 file changed, 44 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentStats.java b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentStats.java index c7863536adf20..aa57c78dca0e8 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentStats.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentStats.java @@ -8,6 +8,7 @@ package org.opensearch.index.remote; +import org.opensearch.Version; import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStats; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.io.stream.StreamInput; @@ -75,6 +76,10 @@ public class RemoteSegmentStats implements Writeable, ToXContentFragment { * Total time spent in downloading segments from remote store */ private long totalDownloadTime; + /** + * Total rejections due to remote store upload backpressure + */ + private long totalRejections; public RemoteSegmentStats() {} @@ -90,6 +95,10 @@ public RemoteSegmentStats(StreamInput in) throws IOException { totalRefreshBytesLag = in.readLong(); totalUploadTime = in.readLong(); totalDownloadTime = in.readLong(); + // TODO: change to V_2_11_0 on main after backport to 2.x + if (in.getVersion().onOrAfter(Version.CURRENT)) { + totalRejections = in.readVLong(); + } } /** @@ -115,6 +124,7 @@ public RemoteSegmentStats(RemoteSegmentTransferTracker.Stats trackerStats) { this.totalRefreshBytesLag = trackerStats.bytesLag; this.totalUploadTime = trackerStats.totalUploadTimeInMs; this.totalDownloadTime = trackerStats.directoryFileTransferTrackerStats.totalTransferTimeInMs; + this.totalRejections = trackerStats.rejectionCount; } // Getter and setters. All are visible for testing @@ -207,6 +217,14 @@ public void addTotalDownloadTime(long totalDownloadTime) { this.totalDownloadTime += totalDownloadTime; } + public long getTotalRejections() { + return totalRejections; + } + + public long addTotalRejections(long totalRejections) { + return this.totalRejections += totalRejections; + } + /** * Adds existing stats. Used for stats roll-ups at index or node level * @@ -225,6 +243,7 @@ public void add(RemoteSegmentStats existingStats) { this.totalRefreshBytesLag += existingStats.getTotalRefreshBytesLag(); this.totalUploadTime += existingStats.getTotalUploadTime(); this.totalDownloadTime += existingStats.getTotalDownloadTime(); + this.totalRejections += existingStats.totalRejections; } } @@ -241,18 +260,26 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(totalRefreshBytesLag); out.writeLong(totalUploadTime); out.writeLong(totalDownloadTime); + // TODO: change to V_2_11_0 on main after backport to 2.x + if (out.getVersion().onOrAfter(Version.CURRENT)) { + out.writeVLong(totalRejections); + } } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(Fields.REMOTE_STORE); + builder.startObject(Fields.UPLOAD); buildUploadStats(builder); - builder.endObject(); + builder.endObject(); // UPLOAD + builder.startObject(Fields.DOWNLOAD); buildDownloadStats(builder); - builder.endObject(); - builder.endObject(); + builder.endObject(); // DOWNLOAD + + builder.endObject(); // REMOTE_STORE + return builder; } @@ -261,13 +288,19 @@ private void buildUploadStats(XContentBuilder builder) throws IOException { builder.humanReadableField(Fields.STARTED_BYTES, Fields.STARTED, new ByteSizeValue(uploadBytesStarted)); builder.humanReadableField(Fields.SUCCEEDED_BYTES, Fields.SUCCEEDED, new ByteSizeValue(uploadBytesSucceeded)); builder.humanReadableField(Fields.FAILED_BYTES, Fields.FAILED, new ByteSizeValue(uploadBytesFailed)); - builder.endObject(); + builder.endObject(); // TOTAL_UPLOAD_SIZE + builder.startObject(Fields.REFRESH_SIZE_LAG); builder.humanReadableField(Fields.TOTAL_BYTES, Fields.TOTAL, new ByteSizeValue(totalRefreshBytesLag)); builder.humanReadableField(Fields.MAX_BYTES, Fields.MAX, new ByteSizeValue(maxRefreshBytesLag)); - builder.endObject(); + builder.endObject(); // REFRESH_SIZE_LAG + builder.humanReadableField(Fields.MAX_REFRESH_TIME_LAG_IN_MILLIS, Fields.MAX_REFRESH_TIME_LAG, new TimeValue(maxRefreshTimeLag)); builder.humanReadableField(Fields.TOTAL_TIME_SPENT_IN_MILLIS, Fields.TOTAL_TIME_SPENT, new TimeValue(totalUploadTime)); + + builder.startObject(Fields.PRESSURE); + builder.field(Fields.TOTAL_REJECTIONS, totalRejections); + builder.endObject(); // PRESSURE } private void buildDownloadStats(XContentBuilder builder) throws IOException { @@ -300,6 +333,8 @@ static final class Fields { static final String MAX_BYTES = "max_bytes"; static final String TOTAL_TIME_SPENT = "total_time_spent"; static final String TOTAL_TIME_SPENT_IN_MILLIS = "total_time_spent_in_millis"; + static final String PRESSURE = "pressure"; + static final String TOTAL_REJECTIONS = "total_rejections"; } @Override @@ -318,7 +353,8 @@ public boolean equals(Object o) { && maxRefreshBytesLag == that.maxRefreshBytesLag && totalRefreshBytesLag == that.totalRefreshBytesLag && totalUploadTime == that.totalUploadTime - && totalDownloadTime == that.totalDownloadTime; + && totalDownloadTime == that.totalDownloadTime + && totalRejections == that.totalRejections; } @Override @@ -334,7 +370,8 @@ public int hashCode() { maxRefreshBytesLag, totalRefreshBytesLag, totalUploadTime, - totalDownloadTime + totalDownloadTime, + totalRejections ); } } From 1f88a95e442b9edf3899c9742c1a3f384ba90540 Mon Sep 17 00:00:00 2001 From: Bhumika Saini Date: Tue, 10 Oct 2023 18:49:16 +0530 Subject: [PATCH 05/13] Add total_rejections to replication stats Signed-off-by: Bhumika Saini --- .../opensearch/index/shard/IndexShardIT.java | 2 ++ .../org/opensearch/index/IndexService.java | 6 ++-- .../opensearch/index/ReplicationStats.java | 31 ++++++++++++++++++- .../opensearch/index/shard/IndexShard.java | 9 ++++-- .../opensearch/indices/IndicesService.java | 7 +++-- .../cluster/IndicesClusterStateService.java | 19 +++++++++--- ...dicesLifecycleListenerSingleNodeTests.java | 1 + ...actIndicesClusterStateServiceTestCase.java | 4 ++- ...ClusterStateServiceRandomUpdatesTests.java | 1 + .../snapshots/SnapshotResiliencyTests.java | 3 +- .../index/shard/IndexShardTestCase.java | 3 +- 11 files changed, 71 insertions(+), 15 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java index c394a1f631690..467b4f5370c0e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java @@ -713,7 +713,9 @@ public static final IndexShard newIndexShard( null, () -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, nodeId, + null, null + ); } diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 84e8e2f41aaf1..72db46e700bd3 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -452,7 +452,8 @@ public synchronized IndexShard createShard( final Consumer globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, final SegmentReplicationCheckpointPublisher checkpointPublisher, - final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory + final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, + final SegmentReplicationPressureService segmentReplicationPressureService ) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); /* @@ -525,7 +526,8 @@ public synchronized IndexShard createShard( remoteStoreStatsTrackerFactory, clusterRemoteTranslogBufferIntervalSupplier, nodeEnv.nodeId(), - recoverySettings + recoverySettings, + segmentReplicationPressureService ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/opensearch/index/ReplicationStats.java b/server/src/main/java/org/opensearch/index/ReplicationStats.java index 9cc6685c75f80..3cc290dffa9fd 100644 --- a/server/src/main/java/org/opensearch/index/ReplicationStats.java +++ b/server/src/main/java/org/opensearch/index/ReplicationStats.java @@ -8,11 +8,13 @@ package org.opensearch.index; +import org.opensearch.Version; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; @@ -29,17 +31,26 @@ public class ReplicationStats implements ToXContentFragment, Writeable { public long maxBytesBehind; public long maxReplicationLag; public long totalBytesBehind; + public long totalRejections; + public ShardId shardId; - public ReplicationStats(long maxBytesBehind, long totalBytesBehind, long maxReplicationLag) { + public ReplicationStats(ShardId shardId, long maxBytesBehind, long totalBytesBehind, long maxReplicationLag, long totalRejections) { + this.shardId = shardId; this.maxBytesBehind = maxBytesBehind; this.totalBytesBehind = totalBytesBehind; this.maxReplicationLag = maxReplicationLag; + this.totalRejections = totalRejections; } public ReplicationStats(StreamInput in) throws IOException { this.maxBytesBehind = in.readVLong(); this.totalBytesBehind = in.readVLong(); this.maxReplicationLag = in.readVLong(); + // TODO: change to V_2_11_0 on main after backport to 2.x + if (in.getVersion().onOrAfter(Version.CURRENT)) { + this.totalRejections = in.readVLong(); + this.shardId = in.readOptionalWriteable(ShardId::new); + } } public ReplicationStats() { @@ -51,6 +62,10 @@ public void add(ReplicationStats other) { maxBytesBehind = Math.max(other.maxBytesBehind, maxBytesBehind); totalBytesBehind += other.totalBytesBehind; maxReplicationLag = Math.max(other.maxReplicationLag, maxReplicationLag); + // TODO + if (this.shardId != other.shardId) { + totalRejections += other.totalRejections; + } } } @@ -66,11 +81,20 @@ public long getMaxReplicationLag() { return this.maxReplicationLag; } + public long getTotalRejections() { + return totalRejections; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeVLong(maxBytesBehind); out.writeVLong(totalBytesBehind); out.writeVLong(maxReplicationLag); + // TODO: change to V_2_11_0 on main after backport to 2.x + if (out.getVersion().onOrAfter(Version.CURRENT)) { + out.writeVLong(totalRejections); + out.writeOptionalWriteable(shardId); + } } @Override @@ -79,6 +103,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(Fields.MAX_BYTES_BEHIND, new ByteSizeValue(maxBytesBehind).toString()); builder.field(Fields.TOTAL_BYTES_BEHIND, new ByteSizeValue(totalBytesBehind).toString()); builder.field(Fields.MAX_REPLICATION_LAG, new TimeValue(maxReplicationLag)); + + builder.startObject("pressure"); + builder.field("total_rejections", totalRejections); + builder.endObject(); + builder.endObject(); return builder; } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 9489c7d7fc1dd..ef1d0c82826af 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -110,6 +110,7 @@ import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; import org.opensearch.index.ReplicationStats; +import org.opensearch.index.SegmentReplicationPressureService; import org.opensearch.index.SegmentReplicationShardStats; import org.opensearch.index.VersionType; import org.opensearch.index.cache.IndexCache; @@ -341,6 +342,7 @@ Runnable getGlobalCheckpointSyncer() { private final List internalRefreshListener = new ArrayList<>(); private final RemoteStoreFileDownloader fileDownloader; + private final SegmentReplicationPressureService segmentReplicationPressureService; public IndexShard( final ShardRouting shardRouting, @@ -369,7 +371,8 @@ public IndexShard( final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, final Supplier clusterRemoteTranslogBufferIntervalSupplier, final String nodeId, - final RecoverySettings recoverySettings + final RecoverySettings recoverySettings, + final SegmentReplicationPressureService segmentReplicationPressureService ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -466,6 +469,7 @@ public boolean shouldCache(Query query) { : mapperService.documentMapper().mappers().containsTimeStampField(); this.remoteStoreStatsTrackerFactory = remoteStoreStatsTrackerFactory; this.fileDownloader = new RemoteStoreFileDownloader(shardRouting.shardId(), threadPool, recoverySettings); + this.segmentReplicationPressureService = segmentReplicationPressureService; } public ThreadPool getThreadPool() { @@ -3000,7 +3004,8 @@ public ReplicationStats getReplicationStats() { .mapToLong(SegmentReplicationShardStats::getCurrentReplicationTimeMillis) .max() .orElse(0L); - return new ReplicationStats(maxBytesBehind, totalBytesBehind, maxReplicationLag); + long totalRejections = segmentReplicationPressureService.getStatsForShard(this).getRejectedRequestCount(); + return new ReplicationStats(shardId, maxBytesBehind, totalBytesBehind, maxReplicationLag, totalRejections); } return new ReplicationStats(); } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 50c551c2be29b..4b78c149f9ee0 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -102,6 +102,7 @@ import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; +import org.opensearch.index.SegmentReplicationPressureService; import org.opensearch.index.analysis.AnalysisRegistry; import org.opensearch.index.cache.request.ShardRequestCache; import org.opensearch.index.engine.CommitStats; @@ -990,7 +991,8 @@ public IndexShard createShard( final RetentionLeaseSyncer retentionLeaseSyncer, final DiscoveryNode targetNode, final DiscoveryNode sourceNode, - final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory + final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, + final SegmentReplicationPressureService segmentReplicationPressureService ) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); ensureChangesAllowed(); @@ -1002,7 +1004,8 @@ public IndexShard createShard( globalCheckpointSyncer, retentionLeaseSyncer, checkpointPublisher, - remoteStoreStatsTrackerFactory + remoteStoreStatsTrackerFactory, + segmentReplicationPressureService ); indexShard.addShardFailureCallback(onShardFailure); indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> { diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index dc538a03de595..59c580cf24500 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -65,6 +65,7 @@ import org.opensearch.index.IndexComponent; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; +import org.opensearch.index.SegmentReplicationPressureService; import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory; import org.opensearch.index.seqno.GlobalCheckpointSyncAction; import org.opensearch.index.seqno.ReplicationTracker; @@ -150,6 +151,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple private final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory; + private final SegmentReplicationPressureService segmentReplicationPressureService; + @Inject public IndicesClusterStateService( final Settings settings, @@ -169,7 +172,8 @@ public IndicesClusterStateService( final GlobalCheckpointSyncAction globalCheckpointSyncAction, final RetentionLeaseSyncer retentionLeaseSyncer, final SegmentReplicationCheckpointPublisher checkpointPublisher, - final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory + final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, + final SegmentReplicationPressureService segmentReplicationPressureService ) { this( settings, @@ -189,7 +193,8 @@ public IndicesClusterStateService( primaryReplicaSyncer, globalCheckpointSyncAction::updateGlobalCheckpointForShard, retentionLeaseSyncer, - remoteStoreStatsTrackerFactory + remoteStoreStatsTrackerFactory, + segmentReplicationPressureService ); } @@ -212,7 +217,8 @@ public IndicesClusterStateService( final PrimaryReplicaSyncer primaryReplicaSyncer, final Consumer globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, - final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory + final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, + final SegmentReplicationPressureService segmentReplicationPressureService ) { this.settings = settings; this.checkpointPublisher = checkpointPublisher; @@ -237,6 +243,7 @@ public IndicesClusterStateService( this.retentionLeaseSyncer = Objects.requireNonNull(retentionLeaseSyncer); this.sendRefreshMapping = settings.getAsBoolean("indices.cluster.send_refresh_mapping", true); this.remoteStoreStatsTrackerFactory = remoteStoreStatsTrackerFactory; + this.segmentReplicationPressureService = segmentReplicationPressureService; } @Override @@ -679,7 +686,8 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR retentionLeaseSyncer, nodes.getLocalNode(), sourceNode, - remoteStoreStatsTrackerFactory + remoteStoreStatsTrackerFactory, + segmentReplicationPressureService ); } catch (Exception e) { failAndRemoveShard(shardRouting, true, "failed to create shard", e, state); @@ -1039,7 +1047,8 @@ T createShard( RetentionLeaseSyncer retentionLeaseSyncer, DiscoveryNode targetNode, @Nullable DiscoveryNode sourceNode, - RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory + RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, + SegmentReplicationPressureService segmentReplicationPressureService ) throws IOException; /** diff --git a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java index 5e6398da6fa1b..650ee6698b964 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java @@ -154,6 +154,7 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem s -> {}, RetentionLeaseSyncer.EMPTY, SegmentReplicationCheckpointPublisher.EMPTY, + null, null ); IndexShardTestCase.updateRoutingEntry(shard, newRouting); diff --git a/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index c455101ff4549..9a6677a98eac9 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -46,6 +46,7 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; +import org.opensearch.index.SegmentReplicationPressureService; import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory; import org.opensearch.index.seqno.RetentionLeaseSyncer; import org.opensearch.index.shard.IndexEventListener; @@ -264,7 +265,8 @@ public MockIndexShard createShard( final RetentionLeaseSyncer retentionLeaseSyncer, final DiscoveryNode targetNode, final DiscoveryNode sourceNode, - final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory + final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, + final SegmentReplicationPressureService segmentReplicationPressureService ) throws IOException { failRandomly(); RecoveryState recoveryState = new RecoveryState(shardRouting, targetNode, sourceNode); diff --git a/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index 22bf337b05598..7cc59aaaa9347 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -584,6 +584,7 @@ private IndicesClusterStateService createIndicesClusterStateService( primaryReplicaSyncer, s -> {}, RetentionLeaseSyncer.EMPTY, + null, null ); } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 97c5d23831965..af9b72de1fff3 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -2137,7 +2137,8 @@ public void onFailure(final Exception e) { ), RetentionLeaseSyncer.EMPTY, SegmentReplicationCheckpointPublisher.EMPTY, - mock(RemoteStoreStatsTrackerFactory.class) + mock(RemoteStoreStatsTrackerFactory.class), + mock(SegmentReplicationPressureService.class) ); final SystemIndices systemIndices = new SystemIndices(emptyMap()); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 9dc230474482f..55455951746c9 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -702,7 +702,8 @@ protected IndexShard newShard( remoteStoreStatsTrackerFactory, () -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, "dummy-node", - DefaultRecoverySettings.INSTANCE + DefaultRecoverySettings.INSTANCE, + null ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); if (remoteStoreStatsTrackerFactory != null) { From 16ca31229fe814df4aa252fe301075a3608cc650 Mon Sep 17 00:00:00 2001 From: Bhumika Saini Date: Wed, 11 Oct 2023 11:14:36 +0530 Subject: [PATCH 06/13] Update UTs Signed-off-by: Bhumika Saini --- .../opensearch/index/shard/IndexShardIT.java | 1 - .../SegmentReplicationPressureService.java | 5 ++++ .../remote/RemoteSegmentTransferTracker.java | 3 ++- .../opensearch/index/shard/IndexShard.java | 5 ++++ .../index/shard/IndexShardTests.java | 26 +++++++++++++++++++ .../index/shard/IndexShardTestCase.java | 20 +++++++++----- 6 files changed, 52 insertions(+), 8 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java index 467b4f5370c0e..5c9944afb2fde 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java @@ -715,7 +715,6 @@ public static final IndexShard newIndexShard( nodeId, null, null - ); } diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java index 4284daf9ffef4..da049e237d79f 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java @@ -135,6 +135,11 @@ public SegmentReplicationPressureService( this.failStaleReplicaTask = new AsyncFailStaleReplicaTask(this); } + /** Only used for testing **/ + public SegmentReplicationStatsTracker getTracker() { + return tracker; + } + // visible for testing AsyncFailStaleReplicaTask getFailStaleReplicaTask() { return failStaleReplicaTask; diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java index 05081180bb179..2a703f17aa953 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java @@ -267,7 +267,8 @@ public long getRejectionCount() { return rejectionCount.get(); } - void incrementRejectionCount() { + /** public only for testing **/ + public void incrementRejectionCount() { rejectionCount.incrementAndGet(); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index ef1d0c82826af..d187d663110ca 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -563,6 +563,11 @@ protected RemoteStoreStatsTrackerFactory getRemoteStoreStatsTrackerFactory() { return remoteStoreStatsTrackerFactory; } + /** Only used for testing **/ + public SegmentReplicationPressureService getSegmentReplicationPressureService() { + return segmentReplicationPressureService; + } + public String getNodeId() { return translogConfig.getNodeId(); } diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index b2eb41828a4df..6f4ab8ab94be1 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -96,6 +96,8 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.env.NodeEnvironment; import org.opensearch.index.IndexSettings; +import org.opensearch.index.ReplicationStats; +import org.opensearch.index.SegmentReplicationStatsTracker; import org.opensearch.index.codec.CodecService; import org.opensearch.index.engine.CommitStats; import org.opensearch.index.engine.DocIdSeqNoAndSource; @@ -1827,6 +1829,7 @@ public void testShardStatsWithRemoteStoreEnabled() throws IOException { .getRemoteSegmentTransferTracker(shard.shardId); RemoteTranslogTransferTracker remoteTranslogTransferTracker = shard.getRemoteStoreStatsTrackerFactory() .getRemoteTranslogTransferTracker(shard.shardId); + populateSampleReplicationStats(shard); populateSampleRemoteSegmentStats(remoteSegmentTransferTracker); populateSampleRemoteTranslogStats(remoteTranslogTransferTracker); ShardStats shardStats = new ShardStats( @@ -1841,9 +1844,28 @@ public void testShardStatsWithRemoteStoreEnabled() throws IOException { assertRemoteSegmentStats(remoteSegmentTransferTracker, remoteSegmentStats); RemoteTranslogStats remoteTranslogStats = shardStats.getStats().getTranslog().getRemoteTranslogStats(); assertRemoteTranslogStats(remoteTranslogTransferTracker, remoteTranslogStats); + ReplicationStats replicationStats = shardStats.getStats().getSegments().getReplicationStats(); + assertReplicationStats(shard, replicationStats); closeShards(shard); } + private static void assertReplicationStats(IndexShard shard, ReplicationStats replicationStats) { + if (shard.isPrimaryMode()) { + assertEquals(5, replicationStats.totalRejections); + } else { + assertEquals(0, replicationStats.totalRejections); + } + } + + private static void populateSampleReplicationStats(IndexShard shard) { + if (shard.isPrimaryMode()) { + SegmentReplicationStatsTracker tracker = shard.getSegmentReplicationPressureService().getTracker(); + for (int i = 0; i < 5; i++) { + tracker.incrementRejectionCount(shard.shardId); + } + } + } + public void testRefreshMetric() throws IOException { IndexShard shard = newStartedShard(); // refresh on: finalize and end of recovery @@ -4910,6 +4932,8 @@ private void populateSampleRemoteSegmentStats(RemoteSegmentTransferTracker track tracker.addUploadBytesStarted(30L); tracker.addUploadBytesSucceeded(10L); tracker.addUploadBytesFailed(10L); + tracker.incrementRejectionCount(); + tracker.incrementRejectionCount(); } private void populateSampleRemoteTranslogStats(RemoteTranslogTransferTracker tracker) { @@ -4943,5 +4967,7 @@ private static void assertRemoteSegmentStats( assertEquals(remoteSegmentTransferTracker.getUploadBytesStarted(), remoteSegmentStats.getUploadBytesStarted()); assertEquals(remoteSegmentTransferTracker.getUploadBytesSucceeded(), remoteSegmentStats.getUploadBytesSucceeded()); assertEquals(remoteSegmentTransferTracker.getUploadBytesFailed(), remoteSegmentStats.getUploadBytesFailed()); + assertTrue(remoteSegmentStats.getTotalRejections() > 0); + assertEquals(remoteSegmentTransferTracker.getRejectionCount(), remoteSegmentStats.getTotalRejections()); } } diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 55455951746c9..a8edf08f52372 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -37,12 +37,15 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; +import org.junit.Assert; +import org.mockito.Mockito; import org.opensearch.ExceptionsHelper; import org.opensearch.Version; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.support.replication.TransportReplicationAction; +import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.MappingMetadata; import org.opensearch.cluster.metadata.RepositoryMetadata; @@ -85,6 +88,7 @@ import org.opensearch.env.TestEnvironment; import org.opensearch.index.IndexSettings; import org.opensearch.index.MapperTestUtils; +import org.opensearch.index.SegmentReplicationPressureService; import org.opensearch.index.VersionType; import org.opensearch.index.cache.IndexCache; import org.opensearch.index.cache.query.DisabledQueryCache; @@ -157,7 +161,6 @@ import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; -import org.junit.Assert; import java.io.IOException; import java.nio.file.Path; @@ -179,10 +182,6 @@ import java.util.function.Function; import java.util.stream.Collectors; -import org.mockito.Mockito; - -import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; -import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; @@ -191,6 +190,8 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; +import static org.opensearch.test.ClusterServiceUtils.createClusterService; /** * A base class for unit tests that need to create and shutdown {@link IndexShard} instances easily, @@ -643,6 +644,13 @@ protected IndexShard newShard( ); Store remoteStore; RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory = null; + SegmentReplicationPressureService segmentReplicationPressureService = new SegmentReplicationPressureService( + Settings.EMPTY, + clusterService, + mock(IndicesService.class), + mock(ShardStateAction.class), + mock(ThreadPool.class) + ); RepositoriesService mockRepoSvc = mock(RepositoriesService.class); if (indexSettings.isRemoteStoreEnabled()) { @@ -703,7 +711,7 @@ protected IndexShard newShard( () -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, "dummy-node", DefaultRecoverySettings.INSTANCE, - null + segmentReplicationPressureService ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); if (remoteStoreStatsTrackerFactory != null) { From d20e09ad50cdc72594e87dd40e9cf8ff03ed0a94 Mon Sep 17 00:00:00 2001 From: Bhumika Saini Date: Wed, 11 Oct 2023 11:46:31 +0530 Subject: [PATCH 07/13] Apply spotless fix Signed-off-by: Bhumika Saini --- .../org/opensearch/index/shard/IndexShardTestCase.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index a8edf08f52372..ac720e8136473 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -37,8 +37,6 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; -import org.junit.Assert; -import org.mockito.Mockito; import org.opensearch.ExceptionsHelper; import org.opensearch.Version; import org.opensearch.action.admin.indices.flush.FlushRequest; @@ -161,6 +159,7 @@ import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; +import org.junit.Assert; import java.io.IOException; import java.nio.file.Path; @@ -182,6 +181,10 @@ import java.util.function.Function; import java.util.stream.Collectors; +import org.mockito.Mockito; + +import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; +import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; @@ -190,8 +193,6 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; -import static org.opensearch.test.ClusterServiceUtils.createClusterService; /** * A base class for unit tests that need to create and shutdown {@link IndexShard} instances easily, From b87b2584ff7e3d89f7008460e09f537ad617c236 Mon Sep 17 00:00:00 2001 From: Bhumika Saini Date: Wed, 11 Oct 2023 18:55:52 +0530 Subject: [PATCH 08/13] Change version check from Version.CURRENT to Version.V_3_0_0 Signed-off-by: Bhumika Saini --- .../src/main/java/org/opensearch/index/ReplicationStats.java | 5 ++--- .../java/org/opensearch/index/remote/RemoteSegmentStats.java | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/ReplicationStats.java b/server/src/main/java/org/opensearch/index/ReplicationStats.java index 3cc290dffa9fd..75fc714d65f9f 100644 --- a/server/src/main/java/org/opensearch/index/ReplicationStats.java +++ b/server/src/main/java/org/opensearch/index/ReplicationStats.java @@ -47,7 +47,7 @@ public ReplicationStats(StreamInput in) throws IOException { this.totalBytesBehind = in.readVLong(); this.maxReplicationLag = in.readVLong(); // TODO: change to V_2_11_0 on main after backport to 2.x - if (in.getVersion().onOrAfter(Version.CURRENT)) { + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { this.totalRejections = in.readVLong(); this.shardId = in.readOptionalWriteable(ShardId::new); } @@ -62,7 +62,6 @@ public void add(ReplicationStats other) { maxBytesBehind = Math.max(other.maxBytesBehind, maxBytesBehind); totalBytesBehind += other.totalBytesBehind; maxReplicationLag = Math.max(other.maxReplicationLag, maxReplicationLag); - // TODO if (this.shardId != other.shardId) { totalRejections += other.totalRejections; } @@ -91,7 +90,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(totalBytesBehind); out.writeVLong(maxReplicationLag); // TODO: change to V_2_11_0 on main after backport to 2.x - if (out.getVersion().onOrAfter(Version.CURRENT)) { + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { out.writeVLong(totalRejections); out.writeOptionalWriteable(shardId); } diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentStats.java b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentStats.java index aa57c78dca0e8..5b7cbafa6cc2c 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentStats.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentStats.java @@ -96,7 +96,7 @@ public RemoteSegmentStats(StreamInput in) throws IOException { totalUploadTime = in.readLong(); totalDownloadTime = in.readLong(); // TODO: change to V_2_11_0 on main after backport to 2.x - if (in.getVersion().onOrAfter(Version.CURRENT)) { + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { totalRejections = in.readVLong(); } } @@ -261,7 +261,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(totalUploadTime); out.writeLong(totalDownloadTime); // TODO: change to V_2_11_0 on main after backport to 2.x - if (out.getVersion().onOrAfter(Version.CURRENT)) { + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { out.writeVLong(totalRejections); } } From 09076b9e4dca3a25835072e09931a7fc6ca97712 Mon Sep 17 00:00:00 2001 From: Bhumika Saini Date: Fri, 13 Oct 2023 16:19:07 +0530 Subject: [PATCH 09/13] Revert changes to add segrep BP rejection count Signed-off-by: Bhumika Saini --- .../opensearch/index/shard/IndexShardIT.java | 1 - .../org/opensearch/index/IndexService.java | 6 ++-- .../opensearch/index/ReplicationStats.java | 30 +------------------ .../SegmentReplicationPressureService.java | 5 ---- .../index/remote/RemoteSegmentStats.java | 4 +-- .../opensearch/index/shard/IndexShard.java | 14 ++------- .../opensearch/indices/IndicesService.java | 7 ++--- .../cluster/IndicesClusterStateService.java | 19 ++++-------- .../index/shard/IndexShardTests.java | 22 -------------- ...dicesLifecycleListenerSingleNodeTests.java | 1 - ...actIndicesClusterStateServiceTestCase.java | 4 +-- ...ClusterStateServiceRandomUpdatesTests.java | 1 - .../snapshots/SnapshotResiliencyTests.java | 20 ++++++------- .../index/shard/IndexShardTestCase.java | 21 ++++--------- 14 files changed, 29 insertions(+), 126 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java index 5c9944afb2fde..c394a1f631690 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java @@ -713,7 +713,6 @@ public static final IndexShard newIndexShard( null, () -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, nodeId, - null, null ); } diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 72db46e700bd3..84e8e2f41aaf1 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -452,8 +452,7 @@ public synchronized IndexShard createShard( final Consumer globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, final SegmentReplicationCheckpointPublisher checkpointPublisher, - final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, - final SegmentReplicationPressureService segmentReplicationPressureService + final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory ) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); /* @@ -526,8 +525,7 @@ public synchronized IndexShard createShard( remoteStoreStatsTrackerFactory, clusterRemoteTranslogBufferIntervalSupplier, nodeEnv.nodeId(), - recoverySettings, - segmentReplicationPressureService + recoverySettings ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/opensearch/index/ReplicationStats.java b/server/src/main/java/org/opensearch/index/ReplicationStats.java index 75fc714d65f9f..9cc6685c75f80 100644 --- a/server/src/main/java/org/opensearch/index/ReplicationStats.java +++ b/server/src/main/java/org/opensearch/index/ReplicationStats.java @@ -8,13 +8,11 @@ package org.opensearch.index; -import org.opensearch.Version; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.common.unit.ByteSizeValue; -import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; @@ -31,26 +29,17 @@ public class ReplicationStats implements ToXContentFragment, Writeable { public long maxBytesBehind; public long maxReplicationLag; public long totalBytesBehind; - public long totalRejections; - public ShardId shardId; - public ReplicationStats(ShardId shardId, long maxBytesBehind, long totalBytesBehind, long maxReplicationLag, long totalRejections) { - this.shardId = shardId; + public ReplicationStats(long maxBytesBehind, long totalBytesBehind, long maxReplicationLag) { this.maxBytesBehind = maxBytesBehind; this.totalBytesBehind = totalBytesBehind; this.maxReplicationLag = maxReplicationLag; - this.totalRejections = totalRejections; } public ReplicationStats(StreamInput in) throws IOException { this.maxBytesBehind = in.readVLong(); this.totalBytesBehind = in.readVLong(); this.maxReplicationLag = in.readVLong(); - // TODO: change to V_2_11_0 on main after backport to 2.x - if (in.getVersion().onOrAfter(Version.V_3_0_0)) { - this.totalRejections = in.readVLong(); - this.shardId = in.readOptionalWriteable(ShardId::new); - } } public ReplicationStats() { @@ -62,9 +51,6 @@ public void add(ReplicationStats other) { maxBytesBehind = Math.max(other.maxBytesBehind, maxBytesBehind); totalBytesBehind += other.totalBytesBehind; maxReplicationLag = Math.max(other.maxReplicationLag, maxReplicationLag); - if (this.shardId != other.shardId) { - totalRejections += other.totalRejections; - } } } @@ -80,20 +66,11 @@ public long getMaxReplicationLag() { return this.maxReplicationLag; } - public long getTotalRejections() { - return totalRejections; - } - @Override public void writeTo(StreamOutput out) throws IOException { out.writeVLong(maxBytesBehind); out.writeVLong(totalBytesBehind); out.writeVLong(maxReplicationLag); - // TODO: change to V_2_11_0 on main after backport to 2.x - if (out.getVersion().onOrAfter(Version.V_3_0_0)) { - out.writeVLong(totalRejections); - out.writeOptionalWriteable(shardId); - } } @Override @@ -102,11 +79,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(Fields.MAX_BYTES_BEHIND, new ByteSizeValue(maxBytesBehind).toString()); builder.field(Fields.TOTAL_BYTES_BEHIND, new ByteSizeValue(totalBytesBehind).toString()); builder.field(Fields.MAX_REPLICATION_LAG, new TimeValue(maxReplicationLag)); - - builder.startObject("pressure"); - builder.field("total_rejections", totalRejections); - builder.endObject(); - builder.endObject(); return builder; } diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java index da049e237d79f..4284daf9ffef4 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java @@ -135,11 +135,6 @@ public SegmentReplicationPressureService( this.failStaleReplicaTask = new AsyncFailStaleReplicaTask(this); } - /** Only used for testing **/ - public SegmentReplicationStatsTracker getTracker() { - return tracker; - } - // visible for testing AsyncFailStaleReplicaTask getFailStaleReplicaTask() { return failStaleReplicaTask; diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentStats.java b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentStats.java index 5b7cbafa6cc2c..21d1c873b0ab9 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentStats.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentStats.java @@ -95,7 +95,7 @@ public RemoteSegmentStats(StreamInput in) throws IOException { totalRefreshBytesLag = in.readLong(); totalUploadTime = in.readLong(); totalDownloadTime = in.readLong(); - // TODO: change to V_2_11_0 on main after backport to 2.x + // TODO: change to V_2_12_0 on main after backport to 2.x if (in.getVersion().onOrAfter(Version.V_3_0_0)) { totalRejections = in.readVLong(); } @@ -260,7 +260,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(totalRefreshBytesLag); out.writeLong(totalUploadTime); out.writeLong(totalDownloadTime); - // TODO: change to V_2_11_0 on main after backport to 2.x + // TODO: change to V_2_12_0 on main after backport to 2.x if (out.getVersion().onOrAfter(Version.V_3_0_0)) { out.writeVLong(totalRejections); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index d187d663110ca..9489c7d7fc1dd 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -110,7 +110,6 @@ import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; import org.opensearch.index.ReplicationStats; -import org.opensearch.index.SegmentReplicationPressureService; import org.opensearch.index.SegmentReplicationShardStats; import org.opensearch.index.VersionType; import org.opensearch.index.cache.IndexCache; @@ -342,7 +341,6 @@ Runnable getGlobalCheckpointSyncer() { private final List internalRefreshListener = new ArrayList<>(); private final RemoteStoreFileDownloader fileDownloader; - private final SegmentReplicationPressureService segmentReplicationPressureService; public IndexShard( final ShardRouting shardRouting, @@ -371,8 +369,7 @@ public IndexShard( final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, final Supplier clusterRemoteTranslogBufferIntervalSupplier, final String nodeId, - final RecoverySettings recoverySettings, - final SegmentReplicationPressureService segmentReplicationPressureService + final RecoverySettings recoverySettings ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -469,7 +466,6 @@ public boolean shouldCache(Query query) { : mapperService.documentMapper().mappers().containsTimeStampField(); this.remoteStoreStatsTrackerFactory = remoteStoreStatsTrackerFactory; this.fileDownloader = new RemoteStoreFileDownloader(shardRouting.shardId(), threadPool, recoverySettings); - this.segmentReplicationPressureService = segmentReplicationPressureService; } public ThreadPool getThreadPool() { @@ -563,11 +559,6 @@ protected RemoteStoreStatsTrackerFactory getRemoteStoreStatsTrackerFactory() { return remoteStoreStatsTrackerFactory; } - /** Only used for testing **/ - public SegmentReplicationPressureService getSegmentReplicationPressureService() { - return segmentReplicationPressureService; - } - public String getNodeId() { return translogConfig.getNodeId(); } @@ -3009,8 +3000,7 @@ public ReplicationStats getReplicationStats() { .mapToLong(SegmentReplicationShardStats::getCurrentReplicationTimeMillis) .max() .orElse(0L); - long totalRejections = segmentReplicationPressureService.getStatsForShard(this).getRejectedRequestCount(); - return new ReplicationStats(shardId, maxBytesBehind, totalBytesBehind, maxReplicationLag, totalRejections); + return new ReplicationStats(maxBytesBehind, totalBytesBehind, maxReplicationLag); } return new ReplicationStats(); } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 4b78c149f9ee0..50c551c2be29b 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -102,7 +102,6 @@ import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; -import org.opensearch.index.SegmentReplicationPressureService; import org.opensearch.index.analysis.AnalysisRegistry; import org.opensearch.index.cache.request.ShardRequestCache; import org.opensearch.index.engine.CommitStats; @@ -991,8 +990,7 @@ public IndexShard createShard( final RetentionLeaseSyncer retentionLeaseSyncer, final DiscoveryNode targetNode, final DiscoveryNode sourceNode, - final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, - final SegmentReplicationPressureService segmentReplicationPressureService + final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory ) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); ensureChangesAllowed(); @@ -1004,8 +1002,7 @@ public IndexShard createShard( globalCheckpointSyncer, retentionLeaseSyncer, checkpointPublisher, - remoteStoreStatsTrackerFactory, - segmentReplicationPressureService + remoteStoreStatsTrackerFactory ); indexShard.addShardFailureCallback(onShardFailure); indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> { diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index 59c580cf24500..dc538a03de595 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -65,7 +65,6 @@ import org.opensearch.index.IndexComponent; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; -import org.opensearch.index.SegmentReplicationPressureService; import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory; import org.opensearch.index.seqno.GlobalCheckpointSyncAction; import org.opensearch.index.seqno.ReplicationTracker; @@ -151,8 +150,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple private final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory; - private final SegmentReplicationPressureService segmentReplicationPressureService; - @Inject public IndicesClusterStateService( final Settings settings, @@ -172,8 +169,7 @@ public IndicesClusterStateService( final GlobalCheckpointSyncAction globalCheckpointSyncAction, final RetentionLeaseSyncer retentionLeaseSyncer, final SegmentReplicationCheckpointPublisher checkpointPublisher, - final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, - final SegmentReplicationPressureService segmentReplicationPressureService + final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory ) { this( settings, @@ -193,8 +189,7 @@ public IndicesClusterStateService( primaryReplicaSyncer, globalCheckpointSyncAction::updateGlobalCheckpointForShard, retentionLeaseSyncer, - remoteStoreStatsTrackerFactory, - segmentReplicationPressureService + remoteStoreStatsTrackerFactory ); } @@ -217,8 +212,7 @@ public IndicesClusterStateService( final PrimaryReplicaSyncer primaryReplicaSyncer, final Consumer globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, - final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, - final SegmentReplicationPressureService segmentReplicationPressureService + final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory ) { this.settings = settings; this.checkpointPublisher = checkpointPublisher; @@ -243,7 +237,6 @@ public IndicesClusterStateService( this.retentionLeaseSyncer = Objects.requireNonNull(retentionLeaseSyncer); this.sendRefreshMapping = settings.getAsBoolean("indices.cluster.send_refresh_mapping", true); this.remoteStoreStatsTrackerFactory = remoteStoreStatsTrackerFactory; - this.segmentReplicationPressureService = segmentReplicationPressureService; } @Override @@ -686,8 +679,7 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR retentionLeaseSyncer, nodes.getLocalNode(), sourceNode, - remoteStoreStatsTrackerFactory, - segmentReplicationPressureService + remoteStoreStatsTrackerFactory ); } catch (Exception e) { failAndRemoveShard(shardRouting, true, "failed to create shard", e, state); @@ -1047,8 +1039,7 @@ T createShard( RetentionLeaseSyncer retentionLeaseSyncer, DiscoveryNode targetNode, @Nullable DiscoveryNode sourceNode, - RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, - SegmentReplicationPressureService segmentReplicationPressureService + RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory ) throws IOException; /** diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 6f4ab8ab94be1..9ef9bec01cb38 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -96,8 +96,6 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.env.NodeEnvironment; import org.opensearch.index.IndexSettings; -import org.opensearch.index.ReplicationStats; -import org.opensearch.index.SegmentReplicationStatsTracker; import org.opensearch.index.codec.CodecService; import org.opensearch.index.engine.CommitStats; import org.opensearch.index.engine.DocIdSeqNoAndSource; @@ -1829,7 +1827,6 @@ public void testShardStatsWithRemoteStoreEnabled() throws IOException { .getRemoteSegmentTransferTracker(shard.shardId); RemoteTranslogTransferTracker remoteTranslogTransferTracker = shard.getRemoteStoreStatsTrackerFactory() .getRemoteTranslogTransferTracker(shard.shardId); - populateSampleReplicationStats(shard); populateSampleRemoteSegmentStats(remoteSegmentTransferTracker); populateSampleRemoteTranslogStats(remoteTranslogTransferTracker); ShardStats shardStats = new ShardStats( @@ -1844,28 +1841,9 @@ public void testShardStatsWithRemoteStoreEnabled() throws IOException { assertRemoteSegmentStats(remoteSegmentTransferTracker, remoteSegmentStats); RemoteTranslogStats remoteTranslogStats = shardStats.getStats().getTranslog().getRemoteTranslogStats(); assertRemoteTranslogStats(remoteTranslogTransferTracker, remoteTranslogStats); - ReplicationStats replicationStats = shardStats.getStats().getSegments().getReplicationStats(); - assertReplicationStats(shard, replicationStats); closeShards(shard); } - private static void assertReplicationStats(IndexShard shard, ReplicationStats replicationStats) { - if (shard.isPrimaryMode()) { - assertEquals(5, replicationStats.totalRejections); - } else { - assertEquals(0, replicationStats.totalRejections); - } - } - - private static void populateSampleReplicationStats(IndexShard shard) { - if (shard.isPrimaryMode()) { - SegmentReplicationStatsTracker tracker = shard.getSegmentReplicationPressureService().getTracker(); - for (int i = 0; i < 5; i++) { - tracker.incrementRejectionCount(shard.shardId); - } - } - } - public void testRefreshMetric() throws IOException { IndexShard shard = newStartedShard(); // refresh on: finalize and end of recovery diff --git a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java index 650ee6698b964..5e6398da6fa1b 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java @@ -154,7 +154,6 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem s -> {}, RetentionLeaseSyncer.EMPTY, SegmentReplicationCheckpointPublisher.EMPTY, - null, null ); IndexShardTestCase.updateRoutingEntry(shard, newRouting); diff --git a/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index 9a6677a98eac9..c455101ff4549 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -46,7 +46,6 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; -import org.opensearch.index.SegmentReplicationPressureService; import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory; import org.opensearch.index.seqno.RetentionLeaseSyncer; import org.opensearch.index.shard.IndexEventListener; @@ -265,8 +264,7 @@ public MockIndexShard createShard( final RetentionLeaseSyncer retentionLeaseSyncer, final DiscoveryNode targetNode, final DiscoveryNode sourceNode, - final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, - final SegmentReplicationPressureService segmentReplicationPressureService + final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory ) throws IOException { failRandomly(); RecoveryState recoveryState = new RecoveryState(shardRouting, targetNode, sourceNode); diff --git a/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index 7cc59aaaa9347..22bf337b05598 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -584,7 +584,6 @@ private IndicesClusterStateService createIndicesClusterStateService( primaryReplicaSyncer, s -> {}, RetentionLeaseSyncer.EMPTY, - null, null ); } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index af9b72de1fff3..5634c24c7e631 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -34,6 +34,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.junit.After; +import org.junit.Before; +import org.mockito.Mockito; import org.opensearch.ExceptionsHelper; import org.opensearch.Version; import org.opensearch.action.ActionModule.DynamicActionRegistry; @@ -232,8 +235,6 @@ import org.opensearch.transport.TransportRequest; import org.opensearch.transport.TransportRequestHandler; import org.opensearch.transport.TransportService; -import org.junit.After; -import org.junit.Before; import java.io.IOException; import java.nio.file.Path; @@ -257,15 +258,8 @@ import java.util.stream.IntStream; import java.util.stream.Stream; -import org.mockito.Mockito; - import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; -import static org.opensearch.action.support.ActionTestUtils.assertNoFailureListener; -import static org.opensearch.env.Environment.PATH_HOME_SETTING; -import static org.opensearch.monitor.StatusInfo.Status.HEALTHY; -import static org.opensearch.node.Node.NODE_NAME_SETTING; -import static org.opensearch.node.Node.NODE_SEARCH_CACHE_SIZE_SETTING; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.either; @@ -278,6 +272,11 @@ import static org.hamcrest.Matchers.notNullValue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.opensearch.action.support.ActionTestUtils.assertNoFailureListener; +import static org.opensearch.env.Environment.PATH_HOME_SETTING; +import static org.opensearch.monitor.StatusInfo.Status.HEALTHY; +import static org.opensearch.node.Node.NODE_NAME_SETTING; +import static org.opensearch.node.Node.NODE_SEARCH_CACHE_SIZE_SETTING; public class SnapshotResiliencyTests extends OpenSearchTestCase { @@ -2137,8 +2136,7 @@ public void onFailure(final Exception e) { ), RetentionLeaseSyncer.EMPTY, SegmentReplicationCheckpointPublisher.EMPTY, - mock(RemoteStoreStatsTrackerFactory.class), - mock(SegmentReplicationPressureService.class) + mock(RemoteStoreStatsTrackerFactory.class) ); final SystemIndices systemIndices = new SystemIndices(emptyMap()); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index ac720e8136473..4af47a603d364 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -37,13 +37,14 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; +import org.junit.Assert; +import org.mockito.Mockito; import org.opensearch.ExceptionsHelper; import org.opensearch.Version; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.support.replication.TransportReplicationAction; -import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.MappingMetadata; import org.opensearch.cluster.metadata.RepositoryMetadata; @@ -86,7 +87,6 @@ import org.opensearch.env.TestEnvironment; import org.opensearch.index.IndexSettings; import org.opensearch.index.MapperTestUtils; -import org.opensearch.index.SegmentReplicationPressureService; import org.opensearch.index.VersionType; import org.opensearch.index.cache.IndexCache; import org.opensearch.index.cache.query.DisabledQueryCache; @@ -159,7 +159,6 @@ import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; -import org.junit.Assert; import java.io.IOException; import java.nio.file.Path; @@ -181,10 +180,6 @@ import java.util.function.Function; import java.util.stream.Collectors; -import org.mockito.Mockito; - -import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; -import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; @@ -193,6 +188,8 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; +import static org.opensearch.test.ClusterServiceUtils.createClusterService; /** * A base class for unit tests that need to create and shutdown {@link IndexShard} instances easily, @@ -645,13 +642,6 @@ protected IndexShard newShard( ); Store remoteStore; RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory = null; - SegmentReplicationPressureService segmentReplicationPressureService = new SegmentReplicationPressureService( - Settings.EMPTY, - clusterService, - mock(IndicesService.class), - mock(ShardStateAction.class), - mock(ThreadPool.class) - ); RepositoriesService mockRepoSvc = mock(RepositoriesService.class); if (indexSettings.isRemoteStoreEnabled()) { @@ -711,8 +701,7 @@ protected IndexShard newShard( remoteStoreStatsTrackerFactory, () -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, "dummy-node", - DefaultRecoverySettings.INSTANCE, - segmentReplicationPressureService + DefaultRecoverySettings.INSTANCE ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); if (remoteStoreStatsTrackerFactory != null) { From e0cf2784d6229337d1443e57dad2832e0decd519 Mon Sep 17 00:00:00 2001 From: Bhumika Saini Date: Fri, 13 Oct 2023 16:36:44 +0530 Subject: [PATCH 10/13] Update tests Signed-off-by: Bhumika Saini --- .../index/remote/RemoteSegmentStats.java | 4 ++-- .../cluster/node/stats/NodeStatsTests.java | 1 + .../snapshots/SnapshotResiliencyTests.java | 17 +++++++++-------- .../index/shard/IndexShardTestCase.java | 9 +++++---- 4 files changed, 17 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentStats.java b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentStats.java index 21d1c873b0ab9..5992923a4157b 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentStats.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentStats.java @@ -221,8 +221,8 @@ public long getTotalRejections() { return totalRejections; } - public long addTotalRejections(long totalRejections) { - return this.totalRejections += totalRejections; + public void addTotalRejections(long totalRejections) { + this.totalRejections += totalRejections; } /** diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index e3f16463a5328..e0b35c69cc3c0 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -815,6 +815,7 @@ private static NodeIndicesStats getNodeIndicesStats(boolean remoteStoreStats) { remoteSegmentStats.setMaxRefreshTimeLag(2L); remoteSegmentStats.addTotalUploadTime(20L); remoteSegmentStats.addTotalDownloadTime(20L); + remoteSegmentStats.addTotalRejections(5L); RemoteTranslogStats remoteTranslogStats = indicesStats.getTranslog().getRemoteTranslogStats(); RemoteTranslogStats otherRemoteTranslogStats = new RemoteTranslogStats(getRandomRemoteTranslogTransferTrackerStats()); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 5634c24c7e631..97c5d23831965 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -34,9 +34,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.junit.After; -import org.junit.Before; -import org.mockito.Mockito; import org.opensearch.ExceptionsHelper; import org.opensearch.Version; import org.opensearch.action.ActionModule.DynamicActionRegistry; @@ -235,6 +232,8 @@ import org.opensearch.transport.TransportRequest; import org.opensearch.transport.TransportRequestHandler; import org.opensearch.transport.TransportService; +import org.junit.After; +import org.junit.Before; import java.io.IOException; import java.nio.file.Path; @@ -258,8 +257,15 @@ import java.util.stream.IntStream; import java.util.stream.Stream; +import org.mockito.Mockito; + import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static org.opensearch.action.support.ActionTestUtils.assertNoFailureListener; +import static org.opensearch.env.Environment.PATH_HOME_SETTING; +import static org.opensearch.monitor.StatusInfo.Status.HEALTHY; +import static org.opensearch.node.Node.NODE_NAME_SETTING; +import static org.opensearch.node.Node.NODE_SEARCH_CACHE_SIZE_SETTING; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.either; @@ -272,11 +278,6 @@ import static org.hamcrest.Matchers.notNullValue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.opensearch.action.support.ActionTestUtils.assertNoFailureListener; -import static org.opensearch.env.Environment.PATH_HOME_SETTING; -import static org.opensearch.monitor.StatusInfo.Status.HEALTHY; -import static org.opensearch.node.Node.NODE_NAME_SETTING; -import static org.opensearch.node.Node.NODE_SEARCH_CACHE_SIZE_SETTING; public class SnapshotResiliencyTests extends OpenSearchTestCase { diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 4af47a603d364..9dc230474482f 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -37,8 +37,6 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; -import org.junit.Assert; -import org.mockito.Mockito; import org.opensearch.ExceptionsHelper; import org.opensearch.Version; import org.opensearch.action.admin.indices.flush.FlushRequest; @@ -159,6 +157,7 @@ import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; +import org.junit.Assert; import java.io.IOException; import java.nio.file.Path; @@ -180,6 +179,10 @@ import java.util.function.Function; import java.util.stream.Collectors; +import org.mockito.Mockito; + +import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; +import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; @@ -188,8 +191,6 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; -import static org.opensearch.test.ClusterServiceUtils.createClusterService; /** * A base class for unit tests that need to create and shutdown {@link IndexShard} instances easily, From 1365951f564c4e17a312e5f02d943c3d38f18ae8 Mon Sep 17 00:00:00 2001 From: Bhumika Saini Date: Fri, 13 Oct 2023 17:02:00 +0530 Subject: [PATCH 11/13] Empty commit to retrigger build Signed-off-by: Bhumika Saini From 7107caa32dda8d8cf5314f4faf7bd306041ae524 Mon Sep 17 00:00:00 2001 From: Bhumika Saini Date: Fri, 13 Oct 2023 22:25:58 +0530 Subject: [PATCH 12/13] Empty commit to retrigger build Signed-off-by: Bhumika Saini From cdbe14d8c7fdf6cdb4fed7bca6b97ddd0081e606 Mon Sep 17 00:00:00 2001 From: Bhumika Saini Date: Fri, 13 Oct 2023 23:20:00 +0530 Subject: [PATCH 13/13] Empty commit to retrigger build Signed-off-by: Bhumika Saini