diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index 0e62a4320e3f3..3ea45bcbf50d7 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -11,11 +11,13 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.store.AlreadyClosedException; import org.opensearch.action.support.ChannelActionListener; import org.opensearch.cluster.ClusterChangedEvent; import org.opensearch.cluster.ClusterStateListener; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; @@ -37,7 +39,9 @@ import org.opensearch.transport.TransportService; import java.io.IOException; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -170,6 +174,35 @@ public void clusterChanged(ClusterChangedEvent event) { } } } + if (event.nodesChanged()) { + List indexShardList = new ArrayList<>(); + DiscoveryNodes nodes = event.state().nodes(); + if (nodes.getMinNodeVersion().equals(nodes.getMaxNodeVersion())) { + for (IndexService indexService : indicesService) { + if (indexService.getIndexSettings().isSegRepEnabled() && (indexService.getIndexSettings().getNumberOfReplicas() > 0)) { + for (IndexShard indexShard : indexService) { + try { + if (indexShard.routingEntry().primary() + && (indexShard.getEngine().config().getClusterMinVersion() != nodes.getMaxNodeVersion())) { + indexShardList.add(indexShard); + } + } catch (AlreadyClosedException e) { + logger.warn("Index shard [{}] engine is already closed.", indexShard.shardId()); + } + } + } + } + } + try { + if (indexShardList.isEmpty() == false) { + for (IndexShard indexShard : indexShardList) { + indexShard.resetEngine(); + } + } + } catch (Exception e) { + logger.error("Received unexpected exception: [{}]", e.getMessage()); + } + } } @Override diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeListener.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeListener.java deleted file mode 100644 index fd0bfd52f5c81..0000000000000 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeListener.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.indices.replication; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.lucene.store.AlreadyClosedException; -import org.opensearch.cluster.ClusterChangedEvent; -import org.opensearch.cluster.ClusterStateListener; -import org.opensearch.cluster.node.DiscoveryNodes; -import org.opensearch.index.IndexService; -import org.opensearch.index.shard.IndexShard; -import org.opensearch.indices.IndicesService; - -import java.util.ArrayList; -import java.util.List; - -/** - * SegmentReplicationUpgradeListener is used to upgrade the opensearch version used by all primaries of a cluster when - * segment replication is enabled and a rolling upgrade is completed (while in mixed cluster state, the primaries use lower codec - * version on their primaries and this needs to be reset once upgrade is complete). - */ -public class SegmentReplicationUpgradeListener implements ClusterStateListener { - - private static final Logger logger = LogManager.getLogger(SegmentReplicationUpgradeListener.class); - - private final IndicesService indicesService; - - public SegmentReplicationUpgradeListener(IndicesService indicesService) { - this.indicesService = indicesService; - } - - @Override - public void clusterChanged(ClusterChangedEvent event) { - if (event.nodesChanged()) { - List indexShardList = new ArrayList<>(); - DiscoveryNodes nodes = event.state().nodes(); - if (nodes.getMinNodeVersion().equals(nodes.getMaxNodeVersion())) { - for (IndexService indexService : indicesService) { - for (IndexShard indexShard : indexService) { - try { - if (indexShard.indexSettings().isSegRepEnabled() - && indexShard.indexSettings().getNumberOfReplicas() > 0 - && indexShard.routingEntry().primary() - && (indexShard.getEngine().config().getClusterMinVersion() != nodes.getMaxNodeVersion())) { - indexShardList.add(indexShard); - } - } catch (AlreadyClosedException e) { - logger.warn("Index shard [{}] engine is already closed.", indexShard.shardId()); - } - } - } - } - try { - if (indexShardList.isEmpty() == false) { - for (IndexShard indexShard : indexShardList) { - indexShard.resetEngine(); - } - } - } catch (Exception e) { - logger.error("Received unexpected exception: [{}]", e.getMessage()); - } - } - - } - -} diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeService.java deleted file mode 100644 index 8b4dc783be146..0000000000000 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeService.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.indices.replication; - -import org.opensearch.cluster.service.ClusterApplierService; -import org.opensearch.common.lease.Releasable; -import org.opensearch.indices.IndicesService; - -/** - * SegmentReplicationUpgradeService is used to manage SegmentReplicationUpgradeListener's lifecycle (creation/deletion). - */ -public class SegmentReplicationUpgradeService implements Releasable { - - private final ClusterApplierService clusterApplierService; - private final SegmentReplicationUpgradeListener clusterStateListener; - - public SegmentReplicationUpgradeService(IndicesService indicesService, ClusterApplierService clusterApplierService) { - SegmentReplicationUpgradeListener clusterStateListener = new SegmentReplicationUpgradeListener(indicesService); - this.clusterApplierService = clusterApplierService; - this.clusterStateListener = clusterStateListener; - this.clusterApplierService.addListener(this.clusterStateListener); - } - - @Override - public void close() { - this.clusterApplierService.removeListener(this.clusterStateListener); - } -} diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 524b646813722..a25bac60f49b6 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -52,7 +52,6 @@ import org.opensearch.indices.replication.SegmentReplicationSourceService; import org.opensearch.extensions.ExtensionsManager; import org.opensearch.extensions.NoopExtensionsManager; -import org.opensearch.indices.replication.SegmentReplicationUpgradeService; import org.opensearch.monitor.fs.FsInfo; import org.opensearch.monitor.fs.FsProbe; import org.opensearch.plugins.ExtensionAwarePlugin; @@ -1098,8 +1097,6 @@ protected Node( b.bind(SegmentReplicationSourceService.class) .toInstance(new SegmentReplicationSourceService(indicesService, transportService, recoverySettings)); } - b.bind(SegmentReplicationUpgradeService.class) - .toInstance(new SegmentReplicationUpgradeService(indicesService, clusterService.getClusterApplierService())); b.bind(HttpServerTransport.class).toInstance(httpServerTransport); pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p)); b.bind(PersistentTasksService.class).toInstance(persistentTasksService); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 157fc9755a10b..a887082fce879 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -12,6 +12,7 @@ import org.apache.lucene.index.SegmentInfos; import org.junit.Assert; import org.opensearch.ExceptionsHelper; +import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.action.delete.DeleteRequest; import org.opensearch.action.index.IndexRequest; @@ -28,6 +29,7 @@ import org.opensearch.common.util.CancellableThreads; import org.opensearch.common.xcontent.XContentType; import org.opensearch.index.IndexSettings; +import org.opensearch.index.codec.CodecService; import org.opensearch.index.engine.DocIdSeqNoAndSource; import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.engine.InternalEngineFactory; @@ -103,18 +105,30 @@ public void testReplicationCheckpointNullForDocRep() throws IOException { * Test that latestReplicationCheckpoint returns ReplicationCheckpoint for segrep enabled indices */ public void testReplicationCheckpointNotNullForSegRep() throws IOException { - final IndexShard indexShard = newStartedShard(randomBoolean(), settings, new NRTReplicationEngineFactory(null)); + final IndexShard indexShard = newStartedShard(randomBoolean(), settings, new NRTReplicationEngineFactory(clusterService)); final ReplicationCheckpoint replicationCheckpoint = indexShard.getLatestReplicationCheckpoint(); assertNotNull(replicationCheckpoint); closeShards(indexShard); } public void testSegmentInfosAndReplicationCheckpointTuple() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) { shards.startAll(); final IndexShard primary = shards.getPrimary(); final IndexShard replica = shards.getReplicas().get(0); + assertEquals( + primary.getEngine().config().getCodecName(), + primary.getEngine().config().getBWCCodec(CodecService.DEFAULT_CODEC).getName() + ); + assertEquals(primary.getEngine().config().getClusterMinVersion(), Version.CURRENT); + + assertEquals( + replica.getEngine().config().getCodecName(), + replica.getEngine().config().getBWCCodec(CodecService.DEFAULT_CODEC).getName() + ); + assertEquals(replica.getEngine().config().getClusterMinVersion(), Version.CURRENT); + // assert before any indexing: // replica: Tuple, ReplicationCheckpoint> replicaTuple = replica.getLatestSegmentInfosAndCheckpoint(); @@ -160,6 +174,8 @@ private void assertReplicationCheckpoint(IndexShard shard, SegmentInfos segmentI assertNotNull(segmentInfos); assertEquals(checkpoint.getSegmentInfosVersion(), segmentInfos.getVersion()); assertEquals(checkpoint.getSegmentsGen(), segmentInfos.getGeneration()); + assertEquals(checkpoint.getCodec(), shard.getEngine().config().getBWCCodec(CodecService.DEFAULT_CODEC).getName()); + assertEquals(checkpoint.getMinVersion(), Version.CURRENT); } public void testIsSegmentReplicationAllowed_WrongEngineType() throws IOException { @@ -170,7 +186,7 @@ public void testIsSegmentReplicationAllowed_WrongEngineType() throws IOException public void testSegmentReplication_Index_Update_Delete() throws Exception { String mappings = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": { \"properties\": { \"foo\": { \"type\": \"keyword\"} }}}"; - try (ReplicationGroup shards = createGroup(2, settings, mappings, new NRTReplicationEngineFactory(null))) { + try (ReplicationGroup shards = createGroup(2, settings, mappings, new NRTReplicationEngineFactory(clusterService))) { shards.startAll(); final IndexShard primaryShard = shards.getPrimary(); @@ -219,7 +235,7 @@ public void testSegmentReplication_Index_Update_Delete() throws Exception { } public void testIgnoreShardIdle() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) { shards.startAll(); final IndexShard primary = shards.getPrimary(); final IndexShard replica = shards.getReplicas().get(0); @@ -309,8 +325,8 @@ public void testRejectCheckpointOnShardRoutingPrimary() throws IOException { primaryShard.getReplicationTracker().completeRelocationHandoff(); // Assert that primary shard is no longer in Primary Mode and shard routing is still Primary - assertEquals(false, primaryShard.getReplicationTracker().isPrimaryMode()); - assertEquals(true, primaryShard.routingEntry().primary()); + assertFalse(primaryShard.getReplicationTracker().isPrimaryMode()); + assertTrue(primaryShard.routingEntry().primary()); spy.onNewCheckpoint(new ReplicationCheckpoint(primaryShard.shardId(), 0L, 0L, 0L, Codec.getDefault().getName()), spyShard); @@ -319,8 +335,48 @@ public void testRejectCheckpointOnShardRoutingPrimary() throws IOException { closeShards(primaryShard); } + public void testClusterVersionCheckOnNewCheckpointSameVersion() throws Exception { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) { + shards.startAll(); + final IndexShard primary = shards.getPrimary(); + IndexShard replica = shards.getReplicas().get(0); + SegmentReplicationTargetService sut; + sut = prepareForReplication(primary, replica); + SegmentReplicationTargetService spy = spy(sut); + IndexShard spyShard = spy(replica); + ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(replica.shardId(), 0L, 0L, 0L, replica.getDefaultCodecName()); + spy.onNewCheckpoint(checkpoint, spyShard); + // passed the cluster version check and moved on to shouldProcessCheckpoint + verify(spyShard, times(1)).shouldProcessCheckpoint(checkpoint); + } + } + + public void testClusterVersionCheckOnNewCheckpointAheadVersion() throws Exception { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) { + shards.startAll(); + final IndexShard primary = shards.getPrimary(); + IndexShard replica = shards.getReplicas().get(0); + SegmentReplicationTargetService sut; + sut = prepareForReplication(primary, replica); + SegmentReplicationTargetService spy = spy(sut); + IndexShard spyShard = spy(replica); + ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( + replica.shardId(), + 0L, + 0L, + 0L, + 0L, + replica.getDefaultCodecName(), + Version.V_2_7_0 + ); + spy.onNewCheckpoint(checkpoint, spyShard); + // passed the cluster version check and moved on to shouldProcessCheckpoint + verify(spyShard, times(1)).shouldProcessCheckpoint(checkpoint); + } + } + public void testReplicaReceivesGenIncrease() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) { shards.startAll(); final IndexShard primary = shards.getPrimary(); final IndexShard replica = shards.getReplicas().get(0); @@ -562,7 +618,7 @@ public void testReplicaReceivesLowerGeneration() throws Exception { // index docs on new primary and flush // replicate to all. // Expected result: State Gens: P[4], R-1 [4], R-2 [4] - try (ReplicationGroup shards = createGroup(2, settings, new NRTReplicationEngineFactory(null))) { + try (ReplicationGroup shards = createGroup(2, settings, new NRTReplicationEngineFactory(clusterService))) { shards.startAll(); final IndexShard primary = shards.getPrimary(); final IndexShard replica_1 = shards.getReplicas().get(0); @@ -593,7 +649,7 @@ public void testReplicaReceivesLowerGeneration() throws Exception { } public void testReplicaRestarts() throws Exception { - try (ReplicationGroup shards = createGroup(3, settings, new NRTReplicationEngineFactory(null))) { + try (ReplicationGroup shards = createGroup(3, settings, new NRTReplicationEngineFactory(clusterService))) { shards.startAll(); IndexShard primary = shards.getPrimary(); // 1. Create ops that are in the index and xlog of both shards but not yet part of a commit point. @@ -672,7 +728,15 @@ private void testNRTReplicaWithRemoteStorePromotedAsPrimary(boolean performFlush .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) .build(); - try (ReplicationGroup shards = createGroup(1, settings, indexMapping, new NRTReplicationEngineFactory(null), createTempDir())) { + try ( + ReplicationGroup shards = createGroup( + 1, + settings, + indexMapping, + new NRTReplicationEngineFactory(clusterService), + createTempDir() + ) + ) { shards.startAll(); IndexShard oldPrimary = shards.getPrimary(); final IndexShard nextPrimary = shards.getReplicas().get(0); @@ -740,7 +804,7 @@ private void testNRTReplicaWithRemoteStorePromotedAsPrimary(boolean performFlush } public void testNRTReplicaPromotedAsPrimary() throws Exception { - try (ReplicationGroup shards = createGroup(2, settings, new NRTReplicationEngineFactory(null))) { + try (ReplicationGroup shards = createGroup(2, settings, new NRTReplicationEngineFactory(clusterService))) { shards.startAll(); IndexShard oldPrimary = shards.getPrimary(); final IndexShard nextPrimary = shards.getReplicas().get(0); @@ -808,7 +872,7 @@ public void testNRTReplicaPromotedAsPrimary() throws Exception { } public void testReplicaPromotedWhileReplicating() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) { shards.startAll(); final IndexShard oldPrimary = shards.getPrimary(); final IndexShard nextPrimary = shards.getReplicas().get(0); @@ -884,7 +948,7 @@ public void onFailure(Exception e) { } public void testReplicaClosesWhileReplicating_AfterGetCheckpoint() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) { shards.startAll(); IndexShard primary = shards.getPrimary(); final IndexShard replica = shards.getReplicas().get(0); @@ -926,7 +990,7 @@ public void getSegmentFiles( } public void testReplicaClosesWhileReplicating_AfterGetSegmentFiles() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) { shards.startAll(); IndexShard primary = shards.getPrimary(); final IndexShard replica = shards.getReplicas().get(0); @@ -968,7 +1032,7 @@ public void getSegmentFiles( } public void testPrimaryCancelsExecution() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) { shards.startAll(); IndexShard primary = shards.getPrimary(); final IndexShard replica = shards.getReplicas().get(0); diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java index eae070b98c4a1..5f1217ef4cffb 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java @@ -108,7 +108,7 @@ public void testTranslogHistoryTransferred() throws Exception { public void testWithSegmentReplication_ReplicaUsesPrimaryTranslogUUID() throws Exception { Settings settings = Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build(); - try (ReplicationGroup shards = createGroup(2, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(2, settings, new NRTReplicationEngineFactory(null))) { shards.startAll(); final String expectedUUID = getTranslog(shards.getPrimary()).getTranslogUUID(); assertTrue( diff --git a/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java index ff251f42ab21b..719241452d65a 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java @@ -27,7 +27,7 @@ public class RemoteStorePeerRecoverySourceHandlerTests extends OpenSearchIndexLe .build(); public void testReplicaShardRecoveryUptoLastFlushedCommit() throws Exception { - try (ReplicationGroup shards = createGroup(0, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(0, settings, new NRTReplicationEngineFactory(null))) { // Step1 - Start primary, index docs and flush shards.startPrimary(); diff --git a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java index 677352cdd5120..42e3e2c1f6b93 100644 --- a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java @@ -67,7 +67,7 @@ public class OngoingSegmentReplicationsTests extends IndexShardTestCase { public void setUp() throws Exception { super.setUp(); primary = newStartedShard(true, settings); - replica = newShard(false, settings, new NRTReplicationEngineFactory()); + replica = newShard(false, settings, new NRTReplicationEngineFactory(null)); recoverReplica(replica, primary, true); replicaDiscoveryNode = replica.recoveryState().getTargetNode(); primaryDiscoveryNode = replica.recoveryState().getSourceNode(); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java index 607f9dd91e35e..c6dc8b797a6cb 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java @@ -52,7 +52,7 @@ public void setUp() throws Exception { super.setUp(); final Settings settings = Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, "SEGMENT").put(Settings.EMPTY).build(); primary = newStartedShard(true, settings); - replica = newShard(false, settings, new NRTReplicationEngineFactory()); + replica = newShard(false, settings, new NRTReplicationEngineFactory(null)); recoverReplica(replica, primary, true); replicaDiscoveryNode = replica.recoveryState().getTargetNode(); } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 1d1777758972c..911b7d6e3844f 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -88,7 +88,7 @@ public void setUp() throws Exception { .build(); primaryShard = newStartedShard(true, settings); String primaryCodec = primaryShard.getLatestReplicationCheckpoint().getCodec(); - replicaShard = newShard(false, settings, new NRTReplicationEngineFactory()); + replicaShard = newShard(false, settings, new NRTReplicationEngineFactory(null)); recoverReplica(replicaShard, primaryShard, true, getReplicationFunc(replicaShard)); checkpoint = new ReplicationCheckpoint( replicaShard.shardId(), diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index 0e711af1afa62..cc51656cb2be2 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -93,7 +93,7 @@ public void setUp() throws Exception { .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .build(); - indexShard = newStartedShard(false, indexSettings, new NRTReplicationEngineFactory()); + indexShard = newStartedShard(false, indexSettings, new NRTReplicationEngineFactory(null)); spyIndexShard = spy(indexShard); Mockito.doNothing().when(spyIndexShard).finalizeReplication(any(SegmentInfos.class)); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index b1dd4fb1dcc1e..6c05687b3ce3f 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -1354,6 +1354,7 @@ public void getSegmentFiles( }; when(sourceFactory.get(any())).thenReturn(replicationSource); when(indicesService.getShardOrNull(any())).thenReturn(target); + when(indicesService.clusterService()).thenReturn(clusterService); return targetService; }