From 9753add2c41a421211ac525e433320dce5a81204 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Wed, 31 Aug 2022 18:28:22 -0700 Subject: [PATCH] Test failures Signed-off-by: Suraj Singh --- .../SegmentReplicationTargetService.java | 14 +++++--------- .../common/ReplicationCollection.java | 16 +++++++++------- .../SegmentReplicationTargetServiceTests.java | 1 - 3 files changed, 14 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 5d9563c45c174..8fc53ccd3bc08 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -36,7 +36,6 @@ import org.opensearch.transport.TransportService; import java.util.Map; -import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; /** @@ -152,17 +151,14 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe } else { latestReceivedCheckpoint.put(replicaShard.shardId(), receivedCheckpoint); } - Optional ongoingReplicationTarget = onGoingReplications.getOngoingReplicationTarget( - replicaShard.shardId() - ); - if (ongoingReplicationTarget.isPresent()) { - final SegmentReplicationTarget target = ongoingReplicationTarget.get(); - if (target.getCheckpoint().getPrimaryTerm() < receivedCheckpoint.getPrimaryTerm()) { + SegmentReplicationTarget ongoingReplicationTarget = onGoingReplications.getOngoingReplicationTarget(replicaShard.shardId()); + if (ongoingReplicationTarget != null) { + if (ongoingReplicationTarget.getCheckpoint().getPrimaryTerm() < receivedCheckpoint.getPrimaryTerm()) { logger.trace( "Cancelling ongoing replication from old primary with primary term {}", - target.getCheckpoint().getPrimaryTerm() + ongoingReplicationTarget.getCheckpoint().getPrimaryTerm() ); - onGoingReplications.cancel(target.getId(), "Cancelling stuck target after new primary"); + onGoingReplications.cancel(ongoingReplicationTarget.getId(), "Cancelling stuck target after new primary"); } else { logger.trace( () -> new ParameterizedMessage( diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java index f2c626c410b68..20600856c9444 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java @@ -48,9 +48,8 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import java.util.Optional; import java.util.concurrent.ConcurrentMap; -import java.util.stream.Stream; +import java.util.stream.Collectors; /** * This class holds a collection of all on going replication events on the current node (i.e., the node is the target node @@ -241,12 +240,15 @@ public boolean cancelForShard(ShardId shardId, String reason) { * Get target for shard * * @param shardId shardId - * @return Optional ReplicationTarget for input shardId + * @return ReplicationTarget for input shardId */ - public Optional getOngoingReplicationTarget(ShardId shardId) { - final Stream replicationTargets = onGoingTargetEvents.values().stream().filter(t -> t.indexShard.shardId().equals(shardId)); - assert replicationTargets.count() == 1 : "More than one on-going replication targets"; - return replicationTargets.findAny(); + public T getOngoingReplicationTarget(ShardId shardId) { + final List replicationTargetList = onGoingTargetEvents.values() + .stream() + .filter(t -> t.indexShard.shardId().equals(shardId)) + .collect(Collectors.toList()); + assert replicationTargetList.size() <= 1 : "More than one on-going replication targets"; + return replicationTargetList.size() > 0 ? replicationTargetList.get(0) : null; } /** diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 339937717ab2b..1d253b0a9a300 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -76,7 +76,6 @@ public void setUp() throws Exception { initialCheckpoint.getSeqNo(), initialCheckpoint.getSegmentInfosVersion() + 1 ); - newPrimaryCheckpoint = new ReplicationCheckpoint( initialCheckpoint.getShardId(), initialCheckpoint.getPrimaryTerm() + 1,