From 8745d50cee32e03f0a4ff401c55a57938b0aab88 Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Mon, 29 May 2023 23:39:40 -0700 Subject: [PATCH] add unit tests Signed-off-by: Poojita Raj --- .../cluster/ClusterChangedEvent.java | 12 ++++++ .../opensearch/index/codec/CodecService.java | 13 ++++--- .../SegmentReplicationSourceService.java | 22 +++++------ .../SegmentReplicationTargetService.java | 2 +- .../SegmentReplicationIndexShardTests.java | 39 +++++++++++++++++-- 5 files changed, 67 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/ClusterChangedEvent.java b/server/src/main/java/org/opensearch/cluster/ClusterChangedEvent.java index 28085dd6e3860..e25b6d4637e09 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterChangedEvent.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterChangedEvent.java @@ -207,6 +207,18 @@ public boolean blocksChanged() { return state.blocks() != previousState.blocks(); } + /** + * Returns true if a version upgrade has taken place in the cluster + */ + public boolean clusterUpgraded() { + // previous state was mixed version cluster and current state is not - which indicates upgrade is completed + if ((previousState.nodes().getMinNodeVersion() != previousState.nodes().getMaxNodeVersion()) + && (state.nodes().getMinNodeVersion() == state.nodes().getMaxNodeVersion())) { + return true; + } + return false; + } + /** * Returns true iff the local node is the mater node of the cluster. */ diff --git a/server/src/main/java/org/opensearch/index/codec/CodecService.java b/server/src/main/java/org/opensearch/index/codec/CodecService.java index 83d01f7558110..e4f176c33414a 100644 --- a/server/src/main/java/org/opensearch/index/codec/CodecService.java +++ b/server/src/main/java/org/opensearch/index/codec/CodecService.java @@ -41,6 +41,7 @@ import org.opensearch.common.collect.MapBuilder; import org.opensearch.index.mapper.MapperService; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -60,7 +61,8 @@ public class CodecService { public static final String BEST_COMPRESSION_CODEC = "best_compression"; /** the raw unfiltered lucene default. useful for testing */ public static final String LUCENE_DEFAULT_CODEC = "lucene_default"; - public static Map opensearchVersionToLuceneCodec = new HashMap<>(); + static Map versionStringMap = new HashMap<>(); + public static Map opensearchVersionToLuceneCodec; public CodecService(@Nullable MapperService mapperService, Logger logger) { loadMap(); @@ -80,10 +82,11 @@ public CodecService(@Nullable MapperService mapperService, Logger logger) { } public void loadMap() { - opensearchVersionToLuceneCodec.put(Version.fromString("3.0.0"), "Lucene95"); - opensearchVersionToLuceneCodec.put(Version.fromString("2.8.0"), "Lucene95"); - opensearchVersionToLuceneCodec.put(Version.fromString("2.7.1"), "Lucene95"); - opensearchVersionToLuceneCodec.put(Version.fromString("2.7.0"), "Lucene95"); + versionStringMap.put(Version.fromString("3.0.0"), "Lucene95"); + versionStringMap.put(Version.fromString("2.8.0"), "Lucene95"); + versionStringMap.put(Version.fromString("2.7.1"), "Lucene95"); + versionStringMap.put(Version.fromString("2.7.0"), "Lucene95"); + opensearchVersionToLuceneCodec = Collections.unmodifiableMap(new HashMap<>(versionStringMap)); } public Codec codec(String name) { diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index 3ea45bcbf50d7..0b1c09d2bdfc1 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -174,21 +174,19 @@ public void clusterChanged(ClusterChangedEvent event) { } } } - if (event.nodesChanged()) { + if (event.clusterUpgraded()) { List indexShardList = new ArrayList<>(); DiscoveryNodes nodes = event.state().nodes(); - if (nodes.getMinNodeVersion().equals(nodes.getMaxNodeVersion())) { - for (IndexService indexService : indicesService) { - if (indexService.getIndexSettings().isSegRepEnabled() && (indexService.getIndexSettings().getNumberOfReplicas() > 0)) { - for (IndexShard indexShard : indexService) { - try { - if (indexShard.routingEntry().primary() - && (indexShard.getEngine().config().getClusterMinVersion() != nodes.getMaxNodeVersion())) { - indexShardList.add(indexShard); - } - } catch (AlreadyClosedException e) { - logger.warn("Index shard [{}] engine is already closed.", indexShard.shardId()); + for (IndexService indexService : indicesService) { + if (indexService.getIndexSettings().isSegRepEnabled() && (indexService.getIndexSettings().getNumberOfReplicas() > 0)) { + for (IndexShard indexShard : indexService) { + try { + if (indexShard.routingEntry().primary() + && (indexShard.getEngine().config().getClusterMinVersion() != nodes.getMaxNodeVersion())) { + indexShardList.add(indexShard); } + } catch (AlreadyClosedException e) { + logger.warn("Index shard [{}] engine is already closed.", indexShard.shardId()); } } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 110d1c6e4436a..1bb13bd5fec4d 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -228,7 +228,7 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe } } final Thread thread = Thread.currentThread(); - Version localNodeVersion = indicesService.clusterService().state().nodes().getLocalNode().getVersion(); + Version localNodeVersion = Version.CURRENT; // if replica's OS version is not on or after primary version, then can ignore checkpoint if (localNodeVersion.onOrAfter(receivedCheckpoint.getMinVersion()) == false) { logger.trace( diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index a887082fce879..5e4b45568e01d 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -335,13 +335,16 @@ public void testRejectCheckpointOnShardRoutingPrimary() throws IOException { closeShards(primaryShard); } + /** + * Cluster version check in onNewCheckpoint method should pass when replica version is the same as the received checkpoint version. + */ public void testClusterVersionCheckOnNewCheckpointSameVersion() throws Exception { try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) { shards.startAll(); final IndexShard primary = shards.getPrimary(); IndexShard replica = shards.getReplicas().get(0); SegmentReplicationTargetService sut; - sut = prepareForReplication(primary, replica); + sut = prepareForReplication(primary, replica, mock(TransportService.class), mock(IndicesService.class)); SegmentReplicationTargetService spy = spy(sut); IndexShard spyShard = spy(replica); ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(replica.shardId(), 0L, 0L, 0L, replica.getDefaultCodecName()); @@ -351,13 +354,16 @@ public void testClusterVersionCheckOnNewCheckpointSameVersion() throws Exception } } + /** + * Cluster version check in onNewCheckpoint method should pass when replica version is ahead of the received checkpoint version. + */ public void testClusterVersionCheckOnNewCheckpointAheadVersion() throws Exception { try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) { shards.startAll(); final IndexShard primary = shards.getPrimary(); IndexShard replica = shards.getReplicas().get(0); SegmentReplicationTargetService sut; - sut = prepareForReplication(primary, replica); + sut = prepareForReplication(primary, replica, mock(TransportService.class), mock(IndicesService.class)); SegmentReplicationTargetService spy = spy(sut); IndexShard spyShard = spy(replica); ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( @@ -367,7 +373,7 @@ public void testClusterVersionCheckOnNewCheckpointAheadVersion() throws Exceptio 0L, 0L, replica.getDefaultCodecName(), - Version.V_2_7_0 + Version.fromId(Version.CURRENT.id - 1) ); spy.onNewCheckpoint(checkpoint, spyShard); // passed the cluster version check and moved on to shouldProcessCheckpoint @@ -375,6 +381,33 @@ public void testClusterVersionCheckOnNewCheckpointAheadVersion() throws Exceptio } } + /** + * Cluster version check in onNewCheckpoint method should fail when replica version is behind the received checkpoint version. + */ + public void testClusterVersionCheckFailOnNewCheckpointBehindVersion() throws Exception { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) { + shards.startAll(); + final IndexShard primary = shards.getPrimary(); + IndexShard replica = shards.getReplicas().get(0); + SegmentReplicationTargetService sut; + sut = prepareForReplication(primary, replica, mock(TransportService.class), mock(IndicesService.class)); + SegmentReplicationTargetService spy = spy(sut); + IndexShard spyShard = spy(replica); + ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( + replica.shardId(), + 0L, + 0L, + 0L, + 0L, + replica.getDefaultCodecName(), + Version.fromId(Version.CURRENT.id + 1) + ); + spy.onNewCheckpoint(checkpoint, spyShard); + // did not pass the version check and returned before shouldProcessCheckpoint method + verify(spyShard, times(0)).shouldProcessCheckpoint(checkpoint); + } + } + public void testReplicaReceivesGenIncrease() throws Exception { try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) { shards.startAll();