diff --git a/server/src/main/java/org/opensearch/cluster/ClusterChangedEvent.java b/server/src/main/java/org/opensearch/cluster/ClusterChangedEvent.java
index 28085dd6e3860..e25b6d4637e09 100644
--- a/server/src/main/java/org/opensearch/cluster/ClusterChangedEvent.java
+++ b/server/src/main/java/org/opensearch/cluster/ClusterChangedEvent.java
@@ -207,6 +207,18 @@ public boolean blocksChanged() {
return state.blocks() != previousState.blocks();
}
+ /**
+ * Returns true
if a version upgrade has taken place in the cluster
+ */
+ public boolean clusterUpgraded() {
+ // previous state was mixed version cluster and current state is not - which indicates upgrade is completed
+ if ((previousState.nodes().getMinNodeVersion() != previousState.nodes().getMaxNodeVersion())
+ && (state.nodes().getMinNodeVersion() == state.nodes().getMaxNodeVersion())) {
+ return true;
+ }
+ return false;
+ }
+
/**
* Returns true
iff the local node is the mater node of the cluster.
*/
diff --git a/server/src/main/java/org/opensearch/index/codec/CodecService.java b/server/src/main/java/org/opensearch/index/codec/CodecService.java
index 83d01f7558110..e4f176c33414a 100644
--- a/server/src/main/java/org/opensearch/index/codec/CodecService.java
+++ b/server/src/main/java/org/opensearch/index/codec/CodecService.java
@@ -41,6 +41,7 @@
import org.opensearch.common.collect.MapBuilder;
import org.opensearch.index.mapper.MapperService;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -60,7 +61,8 @@ public class CodecService {
public static final String BEST_COMPRESSION_CODEC = "best_compression";
/** the raw unfiltered lucene default. useful for testing */
public static final String LUCENE_DEFAULT_CODEC = "lucene_default";
- public static Map opensearchVersionToLuceneCodec = new HashMap<>();
+ static Map versionStringMap = new HashMap<>();
+ public static Map opensearchVersionToLuceneCodec;
public CodecService(@Nullable MapperService mapperService, Logger logger) {
loadMap();
@@ -80,10 +82,11 @@ public CodecService(@Nullable MapperService mapperService, Logger logger) {
}
public void loadMap() {
- opensearchVersionToLuceneCodec.put(Version.fromString("3.0.0"), "Lucene95");
- opensearchVersionToLuceneCodec.put(Version.fromString("2.8.0"), "Lucene95");
- opensearchVersionToLuceneCodec.put(Version.fromString("2.7.1"), "Lucene95");
- opensearchVersionToLuceneCodec.put(Version.fromString("2.7.0"), "Lucene95");
+ versionStringMap.put(Version.fromString("3.0.0"), "Lucene95");
+ versionStringMap.put(Version.fromString("2.8.0"), "Lucene95");
+ versionStringMap.put(Version.fromString("2.7.1"), "Lucene95");
+ versionStringMap.put(Version.fromString("2.7.0"), "Lucene95");
+ opensearchVersionToLuceneCodec = Collections.unmodifiableMap(new HashMap<>(versionStringMap));
}
public Codec codec(String name) {
diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java
index 3ea45bcbf50d7..0b1c09d2bdfc1 100644
--- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java
+++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java
@@ -174,21 +174,19 @@ public void clusterChanged(ClusterChangedEvent event) {
}
}
}
- if (event.nodesChanged()) {
+ if (event.clusterUpgraded()) {
List indexShardList = new ArrayList<>();
DiscoveryNodes nodes = event.state().nodes();
- if (nodes.getMinNodeVersion().equals(nodes.getMaxNodeVersion())) {
- for (IndexService indexService : indicesService) {
- if (indexService.getIndexSettings().isSegRepEnabled() && (indexService.getIndexSettings().getNumberOfReplicas() > 0)) {
- for (IndexShard indexShard : indexService) {
- try {
- if (indexShard.routingEntry().primary()
- && (indexShard.getEngine().config().getClusterMinVersion() != nodes.getMaxNodeVersion())) {
- indexShardList.add(indexShard);
- }
- } catch (AlreadyClosedException e) {
- logger.warn("Index shard [{}] engine is already closed.", indexShard.shardId());
+ for (IndexService indexService : indicesService) {
+ if (indexService.getIndexSettings().isSegRepEnabled() && (indexService.getIndexSettings().getNumberOfReplicas() > 0)) {
+ for (IndexShard indexShard : indexService) {
+ try {
+ if (indexShard.routingEntry().primary()
+ && (indexShard.getEngine().config().getClusterMinVersion() != nodes.getMaxNodeVersion())) {
+ indexShardList.add(indexShard);
}
+ } catch (AlreadyClosedException e) {
+ logger.warn("Index shard [{}] engine is already closed.", indexShard.shardId());
}
}
}
diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java
index 110d1c6e4436a..1bb13bd5fec4d 100644
--- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java
+++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java
@@ -228,7 +228,7 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe
}
}
final Thread thread = Thread.currentThread();
- Version localNodeVersion = indicesService.clusterService().state().nodes().getLocalNode().getVersion();
+ Version localNodeVersion = Version.CURRENT;
// if replica's OS version is not on or after primary version, then can ignore checkpoint
if (localNodeVersion.onOrAfter(receivedCheckpoint.getMinVersion()) == false) {
logger.trace(
diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java
index a887082fce879..5e4b45568e01d 100644
--- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java
+++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java
@@ -335,13 +335,16 @@ public void testRejectCheckpointOnShardRoutingPrimary() throws IOException {
closeShards(primaryShard);
}
+ /**
+ * Cluster version check in onNewCheckpoint method should pass when replica version is the same as the received checkpoint version.
+ */
public void testClusterVersionCheckOnNewCheckpointSameVersion() throws Exception {
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) {
shards.startAll();
final IndexShard primary = shards.getPrimary();
IndexShard replica = shards.getReplicas().get(0);
SegmentReplicationTargetService sut;
- sut = prepareForReplication(primary, replica);
+ sut = prepareForReplication(primary, replica, mock(TransportService.class), mock(IndicesService.class));
SegmentReplicationTargetService spy = spy(sut);
IndexShard spyShard = spy(replica);
ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(replica.shardId(), 0L, 0L, 0L, replica.getDefaultCodecName());
@@ -351,13 +354,16 @@ public void testClusterVersionCheckOnNewCheckpointSameVersion() throws Exception
}
}
+ /**
+ * Cluster version check in onNewCheckpoint method should pass when replica version is ahead of the received checkpoint version.
+ */
public void testClusterVersionCheckOnNewCheckpointAheadVersion() throws Exception {
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) {
shards.startAll();
final IndexShard primary = shards.getPrimary();
IndexShard replica = shards.getReplicas().get(0);
SegmentReplicationTargetService sut;
- sut = prepareForReplication(primary, replica);
+ sut = prepareForReplication(primary, replica, mock(TransportService.class), mock(IndicesService.class));
SegmentReplicationTargetService spy = spy(sut);
IndexShard spyShard = spy(replica);
ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(
@@ -367,7 +373,7 @@ public void testClusterVersionCheckOnNewCheckpointAheadVersion() throws Exceptio
0L,
0L,
replica.getDefaultCodecName(),
- Version.V_2_7_0
+ Version.fromId(Version.CURRENT.id - 1)
);
spy.onNewCheckpoint(checkpoint, spyShard);
// passed the cluster version check and moved on to shouldProcessCheckpoint
@@ -375,6 +381,33 @@ public void testClusterVersionCheckOnNewCheckpointAheadVersion() throws Exceptio
}
}
+ /**
+ * Cluster version check in onNewCheckpoint method should fail when replica version is behind the received checkpoint version.
+ */
+ public void testClusterVersionCheckFailOnNewCheckpointBehindVersion() throws Exception {
+ try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) {
+ shards.startAll();
+ final IndexShard primary = shards.getPrimary();
+ IndexShard replica = shards.getReplicas().get(0);
+ SegmentReplicationTargetService sut;
+ sut = prepareForReplication(primary, replica, mock(TransportService.class), mock(IndicesService.class));
+ SegmentReplicationTargetService spy = spy(sut);
+ IndexShard spyShard = spy(replica);
+ ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(
+ replica.shardId(),
+ 0L,
+ 0L,
+ 0L,
+ 0L,
+ replica.getDefaultCodecName(),
+ Version.fromId(Version.CURRENT.id + 1)
+ );
+ spy.onNewCheckpoint(checkpoint, spyShard);
+ // did not pass the version check and returned before shouldProcessCheckpoint method
+ verify(spyShard, times(0)).shouldProcessCheckpoint(checkpoint);
+ }
+ }
+
public void testReplicaReceivesGenIncrease() throws Exception {
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) {
shards.startAll();