Skip to content

Commit

Permalink
add unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Poojita Raj <[email protected]>
  • Loading branch information
Poojita-Raj committed May 31, 2023
1 parent 1727637 commit 8745d50
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,18 @@ public boolean blocksChanged() {
return state.blocks() != previousState.blocks();
}

/**
* Returns <code>true</code> if a version upgrade has taken place in the cluster
*/
public boolean clusterUpgraded() {
// previous state was mixed version cluster and current state is not - which indicates upgrade is completed
if ((previousState.nodes().getMinNodeVersion() != previousState.nodes().getMaxNodeVersion())
&& (state.nodes().getMinNodeVersion() == state.nodes().getMaxNodeVersion())) {
return true;
}
return false;
}

/**
* Returns <code>true</code> iff the local node is the mater node of the cluster.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.common.collect.MapBuilder;
import org.opensearch.index.mapper.MapperService;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -60,7 +61,8 @@ public class CodecService {
public static final String BEST_COMPRESSION_CODEC = "best_compression";
/** the raw unfiltered lucene default. useful for testing */
public static final String LUCENE_DEFAULT_CODEC = "lucene_default";
public static Map<Version, String> opensearchVersionToLuceneCodec = new HashMap<>();
static Map<Version, String> versionStringMap = new HashMap<>();
public static Map<Version, String> opensearchVersionToLuceneCodec;

public CodecService(@Nullable MapperService mapperService, Logger logger) {
loadMap();
Expand All @@ -80,10 +82,11 @@ public CodecService(@Nullable MapperService mapperService, Logger logger) {
}

public void loadMap() {
opensearchVersionToLuceneCodec.put(Version.fromString("3.0.0"), "Lucene95");
opensearchVersionToLuceneCodec.put(Version.fromString("2.8.0"), "Lucene95");
opensearchVersionToLuceneCodec.put(Version.fromString("2.7.1"), "Lucene95");
opensearchVersionToLuceneCodec.put(Version.fromString("2.7.0"), "Lucene95");
versionStringMap.put(Version.fromString("3.0.0"), "Lucene95");
versionStringMap.put(Version.fromString("2.8.0"), "Lucene95");
versionStringMap.put(Version.fromString("2.7.1"), "Lucene95");
versionStringMap.put(Version.fromString("2.7.0"), "Lucene95");
opensearchVersionToLuceneCodec = Collections.unmodifiableMap(new HashMap<>(versionStringMap));
}

public Codec codec(String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,21 +174,19 @@ public void clusterChanged(ClusterChangedEvent event) {
}
}
}
if (event.nodesChanged()) {
if (event.clusterUpgraded()) {
List<IndexShard> indexShardList = new ArrayList<>();
DiscoveryNodes nodes = event.state().nodes();
if (nodes.getMinNodeVersion().equals(nodes.getMaxNodeVersion())) {
for (IndexService indexService : indicesService) {
if (indexService.getIndexSettings().isSegRepEnabled() && (indexService.getIndexSettings().getNumberOfReplicas() > 0)) {
for (IndexShard indexShard : indexService) {
try {
if (indexShard.routingEntry().primary()
&& (indexShard.getEngine().config().getClusterMinVersion() != nodes.getMaxNodeVersion())) {
indexShardList.add(indexShard);
}
} catch (AlreadyClosedException e) {
logger.warn("Index shard [{}] engine is already closed.", indexShard.shardId());
for (IndexService indexService : indicesService) {
if (indexService.getIndexSettings().isSegRepEnabled() && (indexService.getIndexSettings().getNumberOfReplicas() > 0)) {
for (IndexShard indexShard : indexService) {
try {
if (indexShard.routingEntry().primary()
&& (indexShard.getEngine().config().getClusterMinVersion() != nodes.getMaxNodeVersion())) {
indexShardList.add(indexShard);
}
} catch (AlreadyClosedException e) {
logger.warn("Index shard [{}] engine is already closed.", indexShard.shardId());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe
}
}
final Thread thread = Thread.currentThread();
Version localNodeVersion = indicesService.clusterService().state().nodes().getLocalNode().getVersion();
Version localNodeVersion = Version.CURRENT;
// if replica's OS version is not on or after primary version, then can ignore checkpoint
if (localNodeVersion.onOrAfter(receivedCheckpoint.getMinVersion()) == false) {
logger.trace(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,13 +335,16 @@ public void testRejectCheckpointOnShardRoutingPrimary() throws IOException {
closeShards(primaryShard);
}

/**
* Cluster version check in onNewCheckpoint method should pass when replica version is the same as the received checkpoint version.
*/
public void testClusterVersionCheckOnNewCheckpointSameVersion() throws Exception {
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) {
shards.startAll();
final IndexShard primary = shards.getPrimary();
IndexShard replica = shards.getReplicas().get(0);
SegmentReplicationTargetService sut;
sut = prepareForReplication(primary, replica);
sut = prepareForReplication(primary, replica, mock(TransportService.class), mock(IndicesService.class));
SegmentReplicationTargetService spy = spy(sut);
IndexShard spyShard = spy(replica);
ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(replica.shardId(), 0L, 0L, 0L, replica.getDefaultCodecName());
Expand All @@ -351,13 +354,16 @@ public void testClusterVersionCheckOnNewCheckpointSameVersion() throws Exception
}
}

/**
* Cluster version check in onNewCheckpoint method should pass when replica version is ahead of the received checkpoint version.
*/
public void testClusterVersionCheckOnNewCheckpointAheadVersion() throws Exception {
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) {
shards.startAll();
final IndexShard primary = shards.getPrimary();
IndexShard replica = shards.getReplicas().get(0);
SegmentReplicationTargetService sut;
sut = prepareForReplication(primary, replica);
sut = prepareForReplication(primary, replica, mock(TransportService.class), mock(IndicesService.class));
SegmentReplicationTargetService spy = spy(sut);
IndexShard spyShard = spy(replica);
ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(
Expand All @@ -367,14 +373,41 @@ public void testClusterVersionCheckOnNewCheckpointAheadVersion() throws Exceptio
0L,
0L,
replica.getDefaultCodecName(),
Version.V_2_7_0
Version.fromId(Version.CURRENT.id - 1)
);
spy.onNewCheckpoint(checkpoint, spyShard);
// passed the cluster version check and moved on to shouldProcessCheckpoint
verify(spyShard, times(1)).shouldProcessCheckpoint(checkpoint);
}
}

/**
* Cluster version check in onNewCheckpoint method should fail when replica version is behind the received checkpoint version.
*/
public void testClusterVersionCheckFailOnNewCheckpointBehindVersion() throws Exception {
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) {
shards.startAll();
final IndexShard primary = shards.getPrimary();
IndexShard replica = shards.getReplicas().get(0);
SegmentReplicationTargetService sut;
sut = prepareForReplication(primary, replica, mock(TransportService.class), mock(IndicesService.class));
SegmentReplicationTargetService spy = spy(sut);
IndexShard spyShard = spy(replica);
ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(
replica.shardId(),
0L,
0L,
0L,
0L,
replica.getDefaultCodecName(),
Version.fromId(Version.CURRENT.id + 1)
);
spy.onNewCheckpoint(checkpoint, spyShard);
// did not pass the version check and returned before shouldProcessCheckpoint method
verify(spyShard, times(0)).shouldProcessCheckpoint(checkpoint);
}
}

public void testReplicaReceivesGenIncrease() throws Exception {
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) {
shards.startAll();
Expand Down

0 comments on commit 8745d50

Please sign in to comment.