Skip to content

Commit

Permalink
add unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Poojita Raj <[email protected]>
  • Loading branch information
Poojita-Raj committed May 30, 2023
1 parent a563655 commit c1d2678
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,18 @@ public boolean blocksChanged() {
return state.blocks() != previousState.blocks();
}

/**
* Returns <code>true</code> if a version upgrade has taken place in the cluster
*/
public boolean clusterUpgraded() {
// previous state was mixed version cluster and current state is not - which indicates upgrade is completed
if ((previousState.nodes().getMinNodeVersion() != previousState.nodes().getMaxNodeVersion())
&& (state.nodes().getMinNodeVersion() == state.nodes().getMaxNodeVersion())) {
return true;
}
return false;
}

/**
* Returns <code>true</code> iff the local node is the mater node of the cluster.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,21 +174,19 @@ public void clusterChanged(ClusterChangedEvent event) {
}
}
}
if (event.nodesChanged()) {
if (event.clusterUpgraded()) {
List<IndexShard> indexShardList = new ArrayList<>();
DiscoveryNodes nodes = event.state().nodes();
if (nodes.getMinNodeVersion().equals(nodes.getMaxNodeVersion())) {
for (IndexService indexService : indicesService) {
if (indexService.getIndexSettings().isSegRepEnabled() && (indexService.getIndexSettings().getNumberOfReplicas() > 0)) {
for (IndexShard indexShard : indexService) {
try {
if (indexShard.routingEntry().primary()
&& (indexShard.getEngine().config().getClusterMinVersion() != nodes.getMaxNodeVersion())) {
indexShardList.add(indexShard);
}
} catch (AlreadyClosedException e) {
logger.warn("Index shard [{}] engine is already closed.", indexShard.shardId());
for (IndexService indexService : indicesService) {
if (indexService.getIndexSettings().isSegRepEnabled() && (indexService.getIndexSettings().getNumberOfReplicas() > 0)) {
for (IndexShard indexShard : indexService) {
try {
if (indexShard.routingEntry().primary()
&& (indexShard.getEngine().config().getClusterMinVersion() != nodes.getMaxNodeVersion())) {
indexShardList.add(indexShard);
}
} catch (AlreadyClosedException e) {
logger.warn("Index shard [{}] engine is already closed.", indexShard.shardId());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe
}
}
final Thread thread = Thread.currentThread();
Version localNodeVersion = indicesService.clusterService().state().nodes().getLocalNode().getVersion();
Version localNodeVersion = Version.CURRENT;
// if replica's OS version is not on or after primary version, then can ignore checkpoint
if (localNodeVersion.onOrAfter(receivedCheckpoint.getMinVersion()) == false) {
logger.trace(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,9 @@ public void testRejectCheckpointOnShardRoutingPrimary() throws IOException {
closeShards(primaryShard);
}

/**
* Cluster version check in onNewCheckpoint method should pass when replica version is the same as the received checkpoint version.
*/
public void testClusterVersionCheckOnNewCheckpointSameVersion() throws Exception {
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) {
shards.startAll();
Expand All @@ -350,6 +353,9 @@ public void testClusterVersionCheckOnNewCheckpointSameVersion() throws Exception
}
}

/**
* Cluster version check in onNewCheckpoint method should pass when replica version is ahead of the received checkpoint version.
*/
public void testClusterVersionCheckOnNewCheckpointAheadVersion() throws Exception {
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) {
shards.startAll();
Expand All @@ -366,14 +372,41 @@ public void testClusterVersionCheckOnNewCheckpointAheadVersion() throws Exceptio
0L,
0L,
replica.getDefaultCodecName(),
Version.V_2_7_0
Version.fromId(Version.CURRENT.id - 1)
);
spy.onNewCheckpoint(checkpoint, spyShard);
// passed the cluster version check and moved on to shouldProcessCheckpoint
verify(spyShard, times(1)).shouldProcessCheckpoint(checkpoint);
}
}

/**
* Cluster version check in onNewCheckpoint method should fail when replica version is behind the received checkpoint version.
*/
public void testClusterVersionCheckFailOnNewCheckpointBehindVersion() throws Exception {
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) {
shards.startAll();
final IndexShard primary = shards.getPrimary();
IndexShard replica = shards.getReplicas().get(0);
SegmentReplicationTargetService sut;
sut = prepareForReplication(primary, replica);
SegmentReplicationTargetService spy = spy(sut);
IndexShard spyShard = spy(replica);
ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(
replica.shardId(),
0L,
0L,
0L,
0L,
replica.getDefaultCodecName(),
Version.fromId(Version.CURRENT.id + 1)
);
spy.onNewCheckpoint(checkpoint, spyShard);
// did not pass the version check and returned before shouldProcessCheckpoint method
verify(spyShard, times(0)).shouldProcessCheckpoint(checkpoint);
}
}

public void testReplicaReceivesGenIncrease() throws Exception {
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) {
shards.startAll();
Expand Down

0 comments on commit c1d2678

Please sign in to comment.