From c1d26782b14e2e04decd382195631f397c478acc Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Mon, 29 May 2023 23:39:40 -0700 Subject: [PATCH] add unit tests Signed-off-by: Poojita Raj --- .../cluster/ClusterChangedEvent.java | 12 +++++++ .../SegmentReplicationSourceService.java | 22 ++++++------ .../SegmentReplicationTargetService.java | 2 +- .../SegmentReplicationIndexShardTests.java | 35 ++++++++++++++++++- 4 files changed, 57 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/ClusterChangedEvent.java b/server/src/main/java/org/opensearch/cluster/ClusterChangedEvent.java index 28085dd6e3860..e25b6d4637e09 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterChangedEvent.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterChangedEvent.java @@ -207,6 +207,18 @@ public boolean blocksChanged() { return state.blocks() != previousState.blocks(); } + /** + * Returns true if a version upgrade has taken place in the cluster + */ + public boolean clusterUpgraded() { + // previous state was mixed version cluster and current state is not - which indicates upgrade is completed + if ((previousState.nodes().getMinNodeVersion() != previousState.nodes().getMaxNodeVersion()) + && (state.nodes().getMinNodeVersion() == state.nodes().getMaxNodeVersion())) { + return true; + } + return false; + } + /** * Returns true iff the local node is the mater node of the cluster. */ diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index 3ea45bcbf50d7..0b1c09d2bdfc1 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -174,21 +174,19 @@ public void clusterChanged(ClusterChangedEvent event) { } } } - if (event.nodesChanged()) { + if (event.clusterUpgraded()) { List indexShardList = new ArrayList<>(); DiscoveryNodes nodes = event.state().nodes(); - if (nodes.getMinNodeVersion().equals(nodes.getMaxNodeVersion())) { - for (IndexService indexService : indicesService) { - if (indexService.getIndexSettings().isSegRepEnabled() && (indexService.getIndexSettings().getNumberOfReplicas() > 0)) { - for (IndexShard indexShard : indexService) { - try { - if (indexShard.routingEntry().primary() - && (indexShard.getEngine().config().getClusterMinVersion() != nodes.getMaxNodeVersion())) { - indexShardList.add(indexShard); - } - } catch (AlreadyClosedException e) { - logger.warn("Index shard [{}] engine is already closed.", indexShard.shardId()); + for (IndexService indexService : indicesService) { + if (indexService.getIndexSettings().isSegRepEnabled() && (indexService.getIndexSettings().getNumberOfReplicas() > 0)) { + for (IndexShard indexShard : indexService) { + try { + if (indexShard.routingEntry().primary() + && (indexShard.getEngine().config().getClusterMinVersion() != nodes.getMaxNodeVersion())) { + indexShardList.add(indexShard); } + } catch (AlreadyClosedException e) { + logger.warn("Index shard [{}] engine is already closed.", indexShard.shardId()); } } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index f2fcd5b5d8129..80860f2bfcc9f 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -228,7 +228,7 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe } } final Thread thread = Thread.currentThread(); - Version localNodeVersion = indicesService.clusterService().state().nodes().getLocalNode().getVersion(); + Version localNodeVersion = Version.CURRENT; // if replica's OS version is not on or after primary version, then can ignore checkpoint if (localNodeVersion.onOrAfter(receivedCheckpoint.getMinVersion()) == false) { logger.trace( diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 1ea62b0a53298..b6cca897a3901 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -334,6 +334,9 @@ public void testRejectCheckpointOnShardRoutingPrimary() throws IOException { closeShards(primaryShard); } + /** + * Cluster version check in onNewCheckpoint method should pass when replica version is the same as the received checkpoint version. + */ public void testClusterVersionCheckOnNewCheckpointSameVersion() throws Exception { try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) { shards.startAll(); @@ -350,6 +353,9 @@ public void testClusterVersionCheckOnNewCheckpointSameVersion() throws Exception } } + /** + * Cluster version check in onNewCheckpoint method should pass when replica version is ahead of the received checkpoint version. + */ public void testClusterVersionCheckOnNewCheckpointAheadVersion() throws Exception { try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) { shards.startAll(); @@ -366,7 +372,7 @@ public void testClusterVersionCheckOnNewCheckpointAheadVersion() throws Exceptio 0L, 0L, replica.getDefaultCodecName(), - Version.V_2_7_0 + Version.fromId(Version.CURRENT.id - 1) ); spy.onNewCheckpoint(checkpoint, spyShard); // passed the cluster version check and moved on to shouldProcessCheckpoint @@ -374,6 +380,33 @@ public void testClusterVersionCheckOnNewCheckpointAheadVersion() throws Exceptio } } + /** + * Cluster version check in onNewCheckpoint method should fail when replica version is behind the received checkpoint version. + */ + public void testClusterVersionCheckFailOnNewCheckpointBehindVersion() throws Exception { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) { + shards.startAll(); + final IndexShard primary = shards.getPrimary(); + IndexShard replica = shards.getReplicas().get(0); + SegmentReplicationTargetService sut; + sut = prepareForReplication(primary, replica); + SegmentReplicationTargetService spy = spy(sut); + IndexShard spyShard = spy(replica); + ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( + replica.shardId(), + 0L, + 0L, + 0L, + 0L, + replica.getDefaultCodecName(), + Version.fromId(Version.CURRENT.id + 1) + ); + spy.onNewCheckpoint(checkpoint, spyShard); + // did not pass the version check and returned before shouldProcessCheckpoint method + verify(spyShard, times(0)).shouldProcessCheckpoint(checkpoint); + } + } + public void testReplicaReceivesGenIncrease() throws Exception { try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) { shards.startAll();