From bf9b3dc2b8c5c9fc8d5a194ffdcac1d1a71e9469 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Tue, 28 Mar 2023 01:02:38 +0000 Subject: [PATCH 01/10] Add new background task to fail stale replica shards. Signed-off-by: Rishikesh1159 --- .../index/SegmentReplicationPressureIT.java | 42 ++++++++ .../SegmentReplicationPressureService.java | 95 ++++++++++++++++++- ...egmentReplicationPressureServiceTests.java | 42 +++++++- .../snapshots/SnapshotResiliencyTests.java | 8 +- 4 files changed, 183 insertions(+), 4 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java b/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java index ee5150c97fb4f..50b74a118e4a5 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java @@ -16,6 +16,7 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardState; import org.opensearch.indices.replication.SegmentReplicationBaseIT; import org.opensearch.plugins.Plugin; import org.opensearch.rest.RestStatus; @@ -28,6 +29,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static java.util.Arrays.asList; @@ -179,6 +181,46 @@ public void testBelowReplicaLimit() throws Exception { verifyStoreContent(); } + public void testFailStaleReplica() throws Exception { + + // Starts a primary and replica node. + final String primaryNode = internalCluster().startNode( + Settings.builder().put(MAX_REPLICATION_TIME_SETTING.getKey(), TimeValue.timeValueMillis(500)).build() + ); + createIndex(INDEX_NAME); + ensureYellowAndNoInitializingShards(INDEX_NAME); + final String replicaNode = internalCluster().startNode( + Settings.builder().put(MAX_REPLICATION_TIME_SETTING.getKey(), TimeValue.timeValueMillis(500)).build() + ); + ensureGreen(INDEX_NAME); + + final IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME); + final List replicaNodes = asList(replicaNode); + assertEqualSegmentInfosVersion(replicaNodes, primaryShard); + IndexShard replicaShard = getIndexShard(replicaNode, INDEX_NAME); + + final CountDownLatch latch = new CountDownLatch(1); + final AtomicInteger totalDocs = new AtomicInteger(0); + try (final Releasable ignored = blockReplication(replicaNodes, latch)) { + // Index docs until we replicas are staled. + Thread indexingThread = new Thread(() -> { totalDocs.getAndSet(indexUntilCheckpointCount()); }); + indexingThread.start(); + indexingThread.join(); + latch.await(); + // index again while we are stale. + indexDoc(); + totalDocs.incrementAndGet(); + + // Verify that replica shard is closed. + assertBusy(() -> { assertTrue(replicaShard.state().equals(IndexShardState.CLOSED)); }, 1, TimeUnit.MINUTES); + } + ensureGreen(INDEX_NAME); + final IndexShard replicaAfterFailure = getIndexShard(replicaNode, INDEX_NAME); + + // Verify that new replica shard after failure is different from old replica shard. + assertNotEquals(replicaAfterFailure.routingEntry().allocationId().getId(), replicaShard.routingEntry().allocationId().getId()); + } + public void testBulkWritesRejected() throws Exception { final String primaryNode = internalCluster().startNode(); createIndex(INDEX_NAME); diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java index f31e236fb6184..b1e4c0387558a 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java @@ -10,18 +10,25 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionListener; +import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.AbstractAsyncTask; import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.IndicesService; +import org.opensearch.threadpool.ThreadPool; +import java.io.Closeable; +import java.io.IOException; import java.util.Set; +import java.util.Map; import java.util.stream.Collectors; /** @@ -29,7 +36,7 @@ * * @opensearch.internal */ -public class SegmentReplicationPressureService { +public class SegmentReplicationPressureService implements Closeable { private volatile boolean isSegmentReplicationBackpressureEnabled; private volatile int maxCheckpointsBehind; @@ -70,12 +77,30 @@ public class SegmentReplicationPressureService { ); private final IndicesService indicesService; + + private final ThreadPool threadPool; private final SegmentReplicationStatsTracker tracker; + private final ShardStateAction shardStateAction; + + public AsyncFailStaleReplicaTask getFailStaleReplicaTask() { + return failStaleReplicaTask; + } + + private volatile AsyncFailStaleReplicaTask failStaleReplicaTask; + @Inject - public SegmentReplicationPressureService(Settings settings, ClusterService clusterService, IndicesService indicesService) { + public SegmentReplicationPressureService( + Settings settings, + ClusterService clusterService, + IndicesService indicesService, + ShardStateAction shardStateAction, + ThreadPool threadPool + ) { this.indicesService = indicesService; this.tracker = new SegmentReplicationStatsTracker(this.indicesService); + this.shardStateAction = shardStateAction; + this.threadPool = threadPool; final ClusterSettings clusterSettings = clusterService.getClusterSettings(); this.isSegmentReplicationBackpressureEnabled = SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED.get(settings); @@ -92,6 +117,8 @@ public SegmentReplicationPressureService(Settings settings, ClusterService clust this.maxAllowedStaleReplicas = MAX_ALLOWED_STALE_SHARDS.get(settings); clusterSettings.addSettingsUpdateConsumer(MAX_ALLOWED_STALE_SHARDS, this::setMaxAllowedStaleReplicas); + + this.failStaleReplicaTask = new AsyncFailStaleReplicaTask(TimeValue.timeValueMillis(1)); } public void isSegrepLimitBreached(ShardId shardId) { @@ -154,4 +181,68 @@ public void setMaxAllowedStaleReplicas(double maxAllowedStaleReplicas) { public void setMaxReplicationTime(TimeValue maxReplicationTime) { this.maxReplicationTime = maxReplicationTime; } + + @Override + public void close() throws IOException { + failStaleReplicaTask.close(); + } + + // Background Task to fail replica shards if they are too far behind primary shard. + final class AsyncFailStaleReplicaTask extends AbstractAsyncTask { + + AsyncFailStaleReplicaTask(TimeValue interval) { + super(logger, threadPool, interval, true); + rescheduleIfNecessary(); + } + + @Override + protected boolean mustReschedule() { + return true; + } + + @Override + protected void runInternal() { + final SegmentReplicationStats stats = tracker.getStats(); + for (Map.Entry entry : stats.getShardStats().entrySet()) { + final Set staleReplicas = getStaleReplicas(entry.getValue().getReplicaStats()); + final ShardId shardId = entry.getKey(); + final IndexService indexService = indicesService.indexService(shardId.getIndex()); + final IndexShard primaryShard = indexService.getShard(shardId.getId()); + for (SegmentReplicationShardStats staleReplica : staleReplicas) { + if (staleReplica.getCurrentReplicationTimeMillis() > 2 * maxReplicationTime.millis()) { + shardStateAction.remoteShardFailed( + shardId, + staleReplica.getAllocationId(), + primaryShard.getOperationPrimaryTerm(), + true, + "replica too far behind primary, marking as stale", + null, + new ActionListener<>() { + @Override + public void onResponse(Void unused) { + logger.trace( + "Successfully failed remote shardId [{}] allocation id [{}]", + shardId, + staleReplica.getAllocationId() + ); + } + + @Override + public void onFailure(Exception e) { + logger.error("Failed to send remote shard failure", e); + } + } + ); + } + } + } + } + + @Override + public String toString() { + return "fail_stale_replica"; + } + + } + } diff --git a/server/src/test/java/org/opensearch/index/SegmentReplicationPressureServiceTests.java b/server/src/test/java/org/opensearch/index/SegmentReplicationPressureServiceTests.java index a050a4c2243db..3bc84c2c44be8 100644 --- a/server/src/test/java/org/opensearch/index/SegmentReplicationPressureServiceTests.java +++ b/server/src/test/java/org/opensearch/index/SegmentReplicationPressureServiceTests.java @@ -8,7 +8,9 @@ package org.opensearch.index; +import org.mockito.Mockito; import org.mockito.stubbing.Answer; +import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; @@ -21,6 +23,7 @@ import org.opensearch.index.shard.ShardId; import org.opensearch.indices.IndicesService; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.threadpool.ThreadPool; import java.util.Iterator; import java.util.List; @@ -29,13 +32,20 @@ import java.util.concurrent.TimeUnit; import static java.util.Arrays.asList; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.times; import static org.opensearch.index.SegmentReplicationPressureService.MAX_REPLICATION_TIME_SETTING; import static org.opensearch.index.SegmentReplicationPressureService.SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED; public class SegmentReplicationPressureServiceTests extends OpenSearchIndexLevelReplicationTestCase { + private static ShardStateAction shardStateAction = Mockito.mock(ShardStateAction.class); private static final Settings settings = Settings.builder() .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put(SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED.getKey(), true) @@ -181,6 +191,36 @@ public void testIsSegrepLimitBreached_underStaleNodeLimit() throws Exception { } } + public void testFailStaleReplicaTask() throws Exception { + final Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED.getKey(), true) + .put(MAX_REPLICATION_TIME_SETTING.getKey(), TimeValue.timeValueMillis(10)) + .build(); + + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + shards.startAll(); + final IndexShard primaryShard = shards.getPrimary(); + SegmentReplicationPressureService service = buildPressureService(settings, primaryShard); + + // index docs in batches without refreshing + indexInBatches(5, shards, primaryShard); + + // assert that replica shard is few checkpoints behind primary + Set replicationStats = primaryShard.getReplicationStats(); + assertEquals(1, replicationStats.size()); + SegmentReplicationShardStats shardStats = replicationStats.stream().findFirst().get(); + assertEquals(5, shardStats.getCheckpointsBehindCount()); + + // call the background task + service.getFailStaleReplicaTask().runInternal(); + + // verify that remote shard failed method is called which fails the replica shards falling behind. + verify(shardStateAction, times(1)).remoteShardFailed(any(), anyString(), anyLong(), anyBoolean(), anyString(), any(), any()); + replicateSegments(primaryShard, shards.getReplicas()); + } + } + private int indexInBatches(int count, ReplicationGroup shards, IndexShard primaryShard) throws Exception { int totalDocs = 0; for (int i = 0; i < count; i++) { @@ -202,6 +242,6 @@ private SegmentReplicationPressureService buildPressureService(Settings settings ClusterService clusterService = mock(ClusterService.class); when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); - return new SegmentReplicationPressureService(settings, clusterService, indicesService); + return new SegmentReplicationPressureService(settings, clusterService, indicesService, shardStateAction, mock(ThreadPool.class)); } } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index efaab9e11d644..7e7ceeb890cc4 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -1980,7 +1980,13 @@ public void onFailure(final Exception e) { new UpdateHelper(scriptService), actionFilters, new IndexingPressureService(settings, clusterService), - new SegmentReplicationPressureService(settings, clusterService, mock(IndicesService.class)), + new SegmentReplicationPressureService( + settings, + clusterService, + mock(IndicesService.class), + mock(ShardStateAction.class), + mock(ThreadPool.class) + ), new SystemIndices(emptyMap()) ); actions.put( From 05897cc90ed22add4afd3325f3090e89f1af324d Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Tue, 28 Mar 2023 01:20:07 +0000 Subject: [PATCH 02/10] Add condition to check if backpressure is enabled. Signed-off-by: Rishikesh1159 --- .../SegmentReplicationPressureService.java | 62 ++++++++++--------- 1 file changed, 32 insertions(+), 30 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java index b1e4c0387558a..467134a2fc0cd 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java @@ -202,37 +202,39 @@ protected boolean mustReschedule() { @Override protected void runInternal() { - final SegmentReplicationStats stats = tracker.getStats(); - for (Map.Entry entry : stats.getShardStats().entrySet()) { - final Set staleReplicas = getStaleReplicas(entry.getValue().getReplicaStats()); - final ShardId shardId = entry.getKey(); - final IndexService indexService = indicesService.indexService(shardId.getIndex()); - final IndexShard primaryShard = indexService.getShard(shardId.getId()); - for (SegmentReplicationShardStats staleReplica : staleReplicas) { - if (staleReplica.getCurrentReplicationTimeMillis() > 2 * maxReplicationTime.millis()) { - shardStateAction.remoteShardFailed( - shardId, - staleReplica.getAllocationId(), - primaryShard.getOperationPrimaryTerm(), - true, - "replica too far behind primary, marking as stale", - null, - new ActionListener<>() { - @Override - public void onResponse(Void unused) { - logger.trace( - "Successfully failed remote shardId [{}] allocation id [{}]", - shardId, - staleReplica.getAllocationId() - ); + if (isSegmentReplicationBackpressureEnabled) { + final SegmentReplicationStats stats = tracker.getStats(); + for (Map.Entry entry : stats.getShardStats().entrySet()) { + final Set staleReplicas = getStaleReplicas(entry.getValue().getReplicaStats()); + final ShardId shardId = entry.getKey(); + final IndexService indexService = indicesService.indexService(shardId.getIndex()); + final IndexShard primaryShard = indexService.getShard(shardId.getId()); + for (SegmentReplicationShardStats staleReplica : staleReplicas) { + if (staleReplica.getCurrentReplicationTimeMillis() > 2 * maxReplicationTime.millis()) { + shardStateAction.remoteShardFailed( + shardId, + staleReplica.getAllocationId(), + primaryShard.getOperationPrimaryTerm(), + true, + "replica too far behind primary, marking as stale", + null, + new ActionListener<>() { + @Override + public void onResponse(Void unused) { + logger.trace( + "Successfully failed remote shardId [{}] allocation id [{}]", + shardId, + staleReplica.getAllocationId() + ); + } + + @Override + public void onFailure(Exception e) { + logger.error("Failed to send remote shard failure", e); + } } - - @Override - public void onFailure(Exception e) { - logger.error("Failed to send remote shard failure", e); - } - } - ); + ); + } } } } From dd34f0fe83340e09d699247dc38b62d8cf14a594 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Tue, 28 Mar 2023 04:59:24 +0000 Subject: [PATCH 03/10] Fix failing tests. Signed-off-by: Rishikesh1159 --- .../org/opensearch/index/SegmentReplicationPressureService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java index 467134a2fc0cd..e0d9c94edcdd0 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java @@ -118,7 +118,7 @@ public SegmentReplicationPressureService( this.maxAllowedStaleReplicas = MAX_ALLOWED_STALE_SHARDS.get(settings); clusterSettings.addSettingsUpdateConsumer(MAX_ALLOWED_STALE_SHARDS, this::setMaxAllowedStaleReplicas); - this.failStaleReplicaTask = new AsyncFailStaleReplicaTask(TimeValue.timeValueMillis(1)); + this.failStaleReplicaTask = new AsyncFailStaleReplicaTask(TimeValue.timeValueSeconds(30)); } public void isSegrepLimitBreached(ShardId shardId) { From d98cef75e302c37a442d282200ba8b72d9cfc300 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Wed, 29 Mar 2023 18:37:21 +0000 Subject: [PATCH 04/10] Fix failing tests by adding manual refresh. Signed-off-by: Rishikesh1159 --- .../org/opensearch/index/SegmentReplicationPressureIT.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java b/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java index c884f4742c5c6..70fb0d417a0d1 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java @@ -223,13 +223,14 @@ public void testFailStaleReplica() throws Exception { final CountDownLatch latch = new CountDownLatch(1); final AtomicInteger totalDocs = new AtomicInteger(0); try (final Releasable ignored = blockReplication(replicaNodes, latch)) { - // Index docs until we replicas are staled. + // Index docs until replicas are staled. Thread indexingThread = new Thread(() -> { totalDocs.getAndSet(indexUntilCheckpointCount()); }); indexingThread.start(); indexingThread.join(); latch.await(); // index again while we are stale. indexDoc(); + refresh(INDEX_NAME); totalDocs.incrementAndGet(); // Verify that replica shard is closed. From 282ca4ffe7ea35f3cd5e7ff4ffa61b52bed124a0 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Wed, 29 Mar 2023 18:41:18 +0000 Subject: [PATCH 05/10] Address comments on PR. Signed-off-by: Rishikesh1159 --- .../opensearch/index/SegmentReplicationPressureIT.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java b/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java index 70fb0d417a0d1..e5d164efccf2c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java @@ -204,15 +204,12 @@ public void testBelowReplicaLimit() throws Exception { public void testFailStaleReplica() throws Exception { + Settings settings = Settings.builder().put(MAX_REPLICATION_TIME_SETTING.getKey(), TimeValue.timeValueMillis(500)).build(); // Starts a primary and replica node. - final String primaryNode = internalCluster().startNode( - Settings.builder().put(MAX_REPLICATION_TIME_SETTING.getKey(), TimeValue.timeValueMillis(500)).build() - ); + final String primaryNode = internalCluster().startNode(settings); createIndex(INDEX_NAME); ensureYellowAndNoInitializingShards(INDEX_NAME); - final String replicaNode = internalCluster().startNode( - Settings.builder().put(MAX_REPLICATION_TIME_SETTING.getKey(), TimeValue.timeValueMillis(500)).build() - ); + final String replicaNode = internalCluster().startNode(settings); ensureGreen(INDEX_NAME); final IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME); From 04481da46b78476cef4c382a213589278a01569d Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Thu, 30 Mar 2023 05:23:53 +0000 Subject: [PATCH 06/10] Addressing comments on PR. Signed-off-by: Rishikesh1159 --- .../SegmentReplicationPressureService.java | 30 ++++++++++++------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java index e0d9c94edcdd0..ef6b1db7371b7 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java @@ -118,7 +118,7 @@ public SegmentReplicationPressureService( this.maxAllowedStaleReplicas = MAX_ALLOWED_STALE_SHARDS.get(settings); clusterSettings.addSettingsUpdateConsumer(MAX_ALLOWED_STALE_SHARDS, this::setMaxAllowedStaleReplicas); - this.failStaleReplicaTask = new AsyncFailStaleReplicaTask(TimeValue.timeValueSeconds(30)); + this.failStaleReplicaTask = new AsyncFailStaleReplicaTask(this, TimeValue.timeValueSeconds(30)); } public void isSegrepLimitBreached(ShardId shardId) { @@ -188,10 +188,13 @@ public void close() throws IOException { } // Background Task to fail replica shards if they are too far behind primary shard. - final class AsyncFailStaleReplicaTask extends AbstractAsyncTask { + final static class AsyncFailStaleReplicaTask extends AbstractAsyncTask { - AsyncFailStaleReplicaTask(TimeValue interval) { - super(logger, threadPool, interval, true); + final SegmentReplicationPressureService pressureService; + + AsyncFailStaleReplicaTask(SegmentReplicationPressureService pressureService, TimeValue interval) { + super(logger, pressureService.threadPool, interval, true); + this.pressureService = pressureService; rescheduleIfNecessary(); } @@ -202,16 +205,18 @@ protected boolean mustReschedule() { @Override protected void runInternal() { - if (isSegmentReplicationBackpressureEnabled) { - final SegmentReplicationStats stats = tracker.getStats(); + if (pressureService.isSegmentReplicationBackpressureEnabled) { + final SegmentReplicationStats stats = pressureService.tracker.getStats(); for (Map.Entry entry : stats.getShardStats().entrySet()) { - final Set staleReplicas = getStaleReplicas(entry.getValue().getReplicaStats()); + final Set staleReplicas = pressureService.getStaleReplicas( + entry.getValue().getReplicaStats() + ); final ShardId shardId = entry.getKey(); - final IndexService indexService = indicesService.indexService(shardId.getIndex()); + final IndexService indexService = pressureService.indicesService.indexService(shardId.getIndex()); final IndexShard primaryShard = indexService.getShard(shardId.getId()); for (SegmentReplicationShardStats staleReplica : staleReplicas) { - if (staleReplica.getCurrentReplicationTimeMillis() > 2 * maxReplicationTime.millis()) { - shardStateAction.remoteShardFailed( + if (staleReplica.getCurrentReplicationTimeMillis() > 2 * pressureService.maxReplicationTime.millis()) { + pressureService.shardStateAction.remoteShardFailed( shardId, staleReplica.getAllocationId(), primaryShard.getOperationPrimaryTerm(), @@ -240,6 +245,11 @@ public void onFailure(Exception e) { } } + @Override + protected String getThreadPool() { + return ThreadPool.Names.GENERIC; + } + @Override public String toString() { return "fail_stale_replica"; From d15bedb33be2f5db2e3d90d5494508a3e807ea26 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Mon, 3 Apr 2023 23:29:41 +0000 Subject: [PATCH 07/10] Update background task logic to fail stale replicas of only one shardId's in a single iteration of background task. Signed-off-by: Rishikesh1159 --- .../index/SegmentReplicationPressureIT.java | 4 +- .../SegmentReplicationPressureService.java | 40 ++++++++++++++----- 2 files changed, 31 insertions(+), 13 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java b/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java index e5d164efccf2c..35d6a9ef0ef1d 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java @@ -221,9 +221,7 @@ public void testFailStaleReplica() throws Exception { final AtomicInteger totalDocs = new AtomicInteger(0); try (final Releasable ignored = blockReplication(replicaNodes, latch)) { // Index docs until replicas are staled. - Thread indexingThread = new Thread(() -> { totalDocs.getAndSet(indexUntilCheckpointCount()); }); - indexingThread.start(); - indexingThread.join(); + totalDocs.getAndSet(indexUntilCheckpointCount()); latch.await(); // index again while we are stale. indexDoc(); diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java index ef6b1db7371b7..8e5fef2982607 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java @@ -80,14 +80,9 @@ public class SegmentReplicationPressureService implements Closeable { private final ThreadPool threadPool; private final SegmentReplicationStatsTracker tracker; - private final ShardStateAction shardStateAction; - public AsyncFailStaleReplicaTask getFailStaleReplicaTask() { - return failStaleReplicaTask; - } - - private volatile AsyncFailStaleReplicaTask failStaleReplicaTask; + private final AsyncFailStaleReplicaTask failStaleReplicaTask; @Inject public SegmentReplicationPressureService( @@ -118,7 +113,12 @@ public SegmentReplicationPressureService( this.maxAllowedStaleReplicas = MAX_ALLOWED_STALE_SHARDS.get(settings); clusterSettings.addSettingsUpdateConsumer(MAX_ALLOWED_STALE_SHARDS, this::setMaxAllowedStaleReplicas); - this.failStaleReplicaTask = new AsyncFailStaleReplicaTask(this, TimeValue.timeValueSeconds(30)); + this.failStaleReplicaTask = new AsyncFailStaleReplicaTask(this); + } + + // visible for testing + AsyncFailStaleReplicaTask getFailStaleReplicaTask() { + return failStaleReplicaTask; } public void isSegrepLimitBreached(ShardId shardId) { @@ -192,8 +192,10 @@ final static class AsyncFailStaleReplicaTask extends AbstractAsyncTask { final SegmentReplicationPressureService pressureService; - AsyncFailStaleReplicaTask(SegmentReplicationPressureService pressureService, TimeValue interval) { - super(logger, pressureService.threadPool, interval, true); + static final TimeValue INTERVAL = TimeValue.timeValueSeconds(30); + + AsyncFailStaleReplicaTask(SegmentReplicationPressureService pressureService) { + super(logger, pressureService.threadPool, INTERVAL, true); this.pressureService = pressureService; rescheduleIfNecessary(); } @@ -207,11 +209,29 @@ protected boolean mustReschedule() { protected void runInternal() { if (pressureService.isSegmentReplicationBackpressureEnabled) { final SegmentReplicationStats stats = pressureService.tracker.getStats(); + long highestCurrentReplicationTimeMillis = 0; + ShardId shardIdWithHighestCurrentReplicationTime = null; + + // Find the shardId in node which is having stale replicas with highest current replication time. + // This way we only fail one shardId's stale replicas in every iteration of this background async task and there by decrease + // load gradually on node. for (Map.Entry entry : stats.getShardStats().entrySet()) { final Set staleReplicas = pressureService.getStaleReplicas( entry.getValue().getReplicaStats() ); - final ShardId shardId = entry.getKey(); + for (SegmentReplicationShardStats staleReplica : staleReplicas) { + if (staleReplica.getCurrentReplicationTimeMillis() > highestCurrentReplicationTimeMillis) { + shardIdWithHighestCurrentReplicationTime = entry.getKey(); + } + } + } + + // Fail the stale replicas of shardId having highest current replication time + if (shardIdWithHighestCurrentReplicationTime != null) { + final Set staleReplicas = pressureService.getStaleReplicas( + stats.getShardStats().get(shardIdWithHighestCurrentReplicationTime).getReplicaStats() + ); + final ShardId shardId = shardIdWithHighestCurrentReplicationTime; final IndexService indexService = pressureService.indicesService.indexService(shardId.getIndex()); final IndexShard primaryShard = indexService.getShard(shardId.getId()); for (SegmentReplicationShardStats staleReplica : staleReplicas) { From c332c159d40b59cf0be807d1b350b3856fd16d98 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Mon, 3 Apr 2023 23:57:12 +0000 Subject: [PATCH 08/10] Fix failing import. Signed-off-by: Rishikesh1159 --- .../org/opensearch/index/SegmentReplicationPressureService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java index 563ada8d38924..914b57e30e1e5 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java @@ -18,7 +18,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; -import org.opensearch.core.concurrency.AbstractAsyncTask; +import org.opensearch.common.util.concurrent.AbstractAsyncTask; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; From 54bae01b99d884dcd3b8b6a1a8213a55b190bb7d Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Tue, 4 Apr 2023 05:59:58 +0000 Subject: [PATCH 09/10] Address comments. Signed-off-by: Rishikesh1159 --- .../SegmentReplicationPressureService.java | 92 +++++++++---------- 1 file changed, 44 insertions(+), 48 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java index 914b57e30e1e5..d0742e399de8e 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java @@ -13,6 +13,7 @@ import org.opensearch.action.ActionListener; import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.collect.Tuple; import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; @@ -27,8 +28,8 @@ import java.io.Closeable; import java.io.IOException; +import java.util.Comparator; import java.util.Set; -import java.util.Map; import java.util.stream.Collectors; /** @@ -209,59 +210,54 @@ protected boolean mustReschedule() { protected void runInternal() { if (pressureService.isSegmentReplicationBackpressureEnabled) { final SegmentReplicationStats stats = pressureService.tracker.getStats(); - long highestCurrentReplicationTimeMillis = 0; - ShardId shardIdWithHighestCurrentReplicationTime = null; // Find the shardId in node which is having stale replicas with highest current replication time. // This way we only fail one shardId's stale replicas in every iteration of this background async task and there by decrease // load gradually on node. - for (Map.Entry entry : stats.getShardStats().entrySet()) { - final Set staleReplicas = pressureService.getStaleReplicas( - entry.getValue().getReplicaStats() - ); - for (SegmentReplicationShardStats staleReplica : staleReplicas) { - if (staleReplica.getCurrentReplicationTimeMillis() > highestCurrentReplicationTimeMillis) { - shardIdWithHighestCurrentReplicationTime = entry.getKey(); - } - } - } - - // Fail the stale replicas of shardId having highest current replication time - if (shardIdWithHighestCurrentReplicationTime != null) { - final Set staleReplicas = pressureService.getStaleReplicas( - stats.getShardStats().get(shardIdWithHighestCurrentReplicationTime).getReplicaStats() - ); - final ShardId shardId = shardIdWithHighestCurrentReplicationTime; - final IndexService indexService = pressureService.indicesService.indexService(shardId.getIndex()); - final IndexShard primaryShard = indexService.getShard(shardId.getId()); - for (SegmentReplicationShardStats staleReplica : staleReplicas) { - if (staleReplica.getCurrentReplicationTimeMillis() > 2 * pressureService.maxReplicationTime.millis()) { - pressureService.shardStateAction.remoteShardFailed( - shardId, - staleReplica.getAllocationId(), - primaryShard.getOperationPrimaryTerm(), - true, - "replica too far behind primary, marking as stale", - null, - new ActionListener<>() { - @Override - public void onResponse(Void unused) { - logger.trace( - "Successfully failed remote shardId [{}] allocation id [{}]", - shardId, - staleReplica.getAllocationId() - ); - } - - @Override - public void onFailure(Exception e) { - logger.error("Failed to send remote shard failure", e); + stats.getShardStats() + .entrySet() + .stream() + .flatMap( + entry -> pressureService.getStaleReplicas(entry.getValue().getReplicaStats()) + .stream() + .map(r -> Tuple.tuple(entry.getKey(), r.getCurrentReplicationTimeMillis())) + ) + .max(Comparator.comparingLong(Tuple::v2)) + .map(Tuple::v1) + .ifPresent(shardId -> { + final Set staleReplicas = pressureService.getStaleReplicas( + stats.getShardStats().get(shardId).getReplicaStats() + ); + final IndexService indexService = pressureService.indicesService.indexService(shardId.getIndex()); + final IndexShard primaryShard = indexService.getShard(shardId.getId()); + for (SegmentReplicationShardStats staleReplica : staleReplicas) { + if (staleReplica.getCurrentReplicationTimeMillis() > 2 * pressureService.maxReplicationTime.millis()) { + pressureService.shardStateAction.remoteShardFailed( + shardId, + staleReplica.getAllocationId(), + primaryShard.getOperationPrimaryTerm(), + true, + "replica too far behind primary, marking as stale", + null, + new ActionListener<>() { + @Override + public void onResponse(Void unused) { + logger.trace( + "Successfully failed remote shardId [{}] allocation id [{}]", + shardId, + staleReplica.getAllocationId() + ); + } + + @Override + public void onFailure(Exception e) { + logger.error("Failed to send remote shard failure", e); + } } - } - ); + ); + } } - } - } + }); } } From 1be88b7a4db806283be415f3f23d2e48ad923d70 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Tue, 4 Apr 2023 22:56:31 +0000 Subject: [PATCH 10/10] Add code doc to SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED setting. Signed-off-by: Rishikesh1159 --- .../opensearch/index/SegmentReplicationPressureService.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java index d0742e399de8e..3b1d71675e75b 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java @@ -46,6 +46,10 @@ public class SegmentReplicationPressureService implements Closeable { private static final Logger logger = LogManager.getLogger(SegmentReplicationPressureService.class); + /** + * When enabled, writes will be rejected when a replica shard falls behind by both the MAX_REPLICATION_TIME_SETTING time value and MAX_INDEXING_CHECKPOINTS number of checkpoints. + * Once a shard falls behind double the MAX_REPLICATION_TIME_SETTING time value it will be marked as failed. + */ public static final Setting SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED = Setting.boolSetting( "segrep.pressure.enabled", false,